AHeise commented on a change in pull request #17580:
URL: https://github.com/apache/flink/pull/17580#discussion_r740017920
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
##########
@@ -91,9 +106,42 @@ public void testUserDefinedJobNameWithConfigure() {
testJobName(jobName, env);
}
+ @Test
+ public void testConfigureFileSystem() throws Exception {
+ final Configuration config = new Configuration();
+ EXPECTED_CONFIGURATION.forEach(config::setString);
+ final StreamExecutionEnvironment environment = new
LocalStreamEnvironment(config);
+ environment.getCheckpointConfig().setCheckpointStorage("testConfig:/"
+ tmp.newFolder());
+ environment.enableCheckpointing(100);
+ environment.fromElements(1, 2, 3, 4).map((MapFunction<Integer,
Integer>) value -> value);
+ // We only want to enable the configuration check for this test case.
Nevertheless, the
+ // filesystem is loaded and configured for all test cases in this
module.
+ expectConfiguration = true;
Review comment:
Can't you use `SharedReference` here?
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
##########
@@ -91,9 +106,42 @@ public void testUserDefinedJobNameWithConfigure() {
testJobName(jobName, env);
}
+ @Test
+ public void testConfigureFileSystem() throws Exception {
+ final Configuration config = new Configuration();
+ EXPECTED_CONFIGURATION.forEach(config::setString);
+ final StreamExecutionEnvironment environment = new
LocalStreamEnvironment(config);
+ environment.getCheckpointConfig().setCheckpointStorage("testConfig:/"
+ tmp.newFolder());
+ environment.enableCheckpointing(100);
+ environment.fromElements(1, 2, 3, 4).map((MapFunction<Integer,
Integer>) value -> value);
+ // We only want to enable the configuration check for this test case.
Nevertheless, the
+ // filesystem is loaded and configured for all test cases in this
module.
+ expectConfiguration = true;
+ environment.execute("configureFilesystem");
+ expectConfiguration = false;
+ }
+
private void testJobName(String prefixOfExpectedJobName,
ExecutionEnvironment env) {
env.fromElements(1, 2, 3).writeAsText("/dev/null");
Plan plan = env.createProgramPlan();
assertTrue(plan.getJobName().startsWith(prefixOfExpectedJobName));
}
+
+ /** Test filesystem to check that configurations are propagated. */
+ public static final class TestFileSystemFactoryWithConfiguration
+ extends TestFileSystem.TestFileSystemFactory {
Review comment:
I probably wouldn't inherit here but just create the factory from
scratch. You are only saving one method here but have more interdependence.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -279,6 +282,10 @@ public void start() throws Exception {
miniClusterConfiguration.getRpcServiceSharing() ==
RpcServiceSharing.SHARED;
try {
+ final PluginManager pluginManager =
+
PluginUtils.createPluginManagerFromRootFolder(configuration);
+ FileSystem.initialize(configuration, pluginManager);
+
Review comment:
Is `MiniCluster` always and only used for `LocalExecutionEnvironment`? I
think the fix is legit but I'm wondering if the description is accurate. Are we
solving an issue in the `LocalExecutor` or in `MiniCluster`?
I also don't quite understand why `ClusterEntrypoint` is not used at all in
`LocalExecutionEnvironment`. @zentol could you please also have peek?
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
##########
@@ -91,9 +106,42 @@ public void testUserDefinedJobNameWithConfigure() {
testJobName(jobName, env);
}
+ @Test
+ public void testConfigureFileSystem() throws Exception {
+ final Configuration config = new Configuration();
+ EXPECTED_CONFIGURATION.forEach(config::setString);
+ final StreamExecutionEnvironment environment = new
LocalStreamEnvironment(config);
+ environment.getCheckpointConfig().setCheckpointStorage("testConfig:/"
+ tmp.newFolder());
+ environment.enableCheckpointing(100);
+ environment.fromElements(1, 2, 3, 4).map((MapFunction<Integer,
Integer>) value -> value);
+ // We only want to enable the configuration check for this test case.
Nevertheless, the
+ // filesystem is loaded and configured for all test cases in this
module.
+ expectConfiguration = true;
+ environment.execute("configureFilesystem");
+ expectConfiguration = false;
+ }
+
private void testJobName(String prefixOfExpectedJobName,
ExecutionEnvironment env) {
env.fromElements(1, 2, 3).writeAsText("/dev/null");
Plan plan = env.createProgramPlan();
assertTrue(plan.getJobName().startsWith(prefixOfExpectedJobName));
}
+
+ /** Test filesystem to check that configurations are propagated. */
+ public static final class TestFileSystemFactoryWithConfiguration
+ extends TestFileSystem.TestFileSystemFactory {
+
+ @Override
+ public String getScheme() {
+ return "testConfig";
+ }
+
+ @Override
+ public void configure(Configuration config) {
+ if (!expectConfiguration) {
+ return;
+ }
+ EXPECTED_CONFIGURATION.forEach((k, v) -> assertEquals(v,
config.getString(k, null)));
Review comment:
Use assertion string.
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
##########
@@ -91,9 +106,42 @@ public void testUserDefinedJobNameWithConfigure() {
testJobName(jobName, env);
}
+ @Test
+ public void testConfigureFileSystem() throws Exception {
+ final Configuration config = new Configuration();
+ EXPECTED_CONFIGURATION.forEach(config::setString);
+ final StreamExecutionEnvironment environment = new
LocalStreamEnvironment(config);
+ environment.getCheckpointConfig().setCheckpointStorage("testConfig:/"
+ tmp.newFolder());
+ environment.enableCheckpointing(100);
+ environment.fromElements(1, 2, 3, 4).map((MapFunction<Integer,
Integer>) value -> value);
Review comment:
Is this cast needed?
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
##########
@@ -26,13 +26,22 @@
import org.apache.flink.api.java.io.DiscardingOutputFormat;
Review comment:
Commit + PR message should rather read: `Initialize filesystems when run
in LocalStreamEnvironment`.
The issue is not that configuration was incorrectly passed to the fs but
rather that plugins where not initialized at all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]