This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push: new f07df6864e5 [FLINK-28897][runtime] Fix bug of failed to generate JobGraph when using UDF and enable checkpoint f07df6864e5 is described below commit f07df6864e5da21473bf7396774b71ee5482c290 Author: Jark Wu <j...@apache.org> AuthorDate: Tue Sep 6 18:04:17 2022 +0800 [FLINK-28897][runtime] Fix bug of failed to generate JobGraph when using UDF and enable checkpoint This closes #20761 --- .../flink/client/FlinkPipelineTranslationUtil.java | 21 ++++++++---- .../apache/flink/client/StreamGraphTranslator.java | 8 ++++- .../org/apache/flink/client/cli/CliFrontend.java | 4 ++- .../application/executors/EmbeddedExecutor.java | 3 +- .../executors/AbstractJobClusterExecutor.java | 3 +- .../executors/AbstractSessionClusterExecutor.java | 3 +- .../client/deployment/executors/LocalExecutor.java | 7 ++-- .../executors/PipelineExecutorUtils.java | 12 +++++-- .../client/cli/CliFrontendPackageProgramTest.java | 3 +- .../apache/flink/client/program/ClientTest.java | 10 ++++-- .../DefaultPackagedProgramRetrieverTest.java | 3 +- .../table/sql/codegen/UsingRemoteJarITCase.java | 11 +++++++ .../src/test/resources/scalar_udf_e2e.sql | 37 ++++++++++++++++++++++ .../state/api/runtime/OperatorIDGeneratorTest.java | 3 +- .../flink/streaming/api/graph/StreamGraph.java | 7 ++-- .../api/graph/StreamingJobGraphGenerator.java | 24 ++++++++++---- .../MiniClusterPipelineExecutorServiceLoader.java | 3 +- .../checkpointing/ChangelogRecoveryITCaseBase.java | 2 +- .../RescaleCheckpointManuallyITCase.java | 2 +- 19 files changed, 131 insertions(+), 35 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java index 67e8e724d21..eedf204676e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkPipelineTranslationUtil.java @@ -32,9 +32,13 @@ public final class FlinkPipelineTranslationUtil { /** Transmogrifies the given {@link Pipeline} to a {@link JobGraph}. */ public static JobGraph getJobGraph( - Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { + ClassLoader userClassloader, + Pipeline pipeline, + Configuration optimizerConfiguration, + int defaultParallelism) { - FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline); + FlinkPipelineTranslator pipelineTranslator = + getPipelineTranslator(userClassloader, pipeline); return pipelineTranslator.translateToJobGraph( pipeline, optimizerConfiguration, defaultParallelism); @@ -52,26 +56,29 @@ public final class FlinkPipelineTranslationUtil { try { Thread.currentThread().setContextClassLoader(userClassloader); return FlinkPipelineTranslationUtil.getJobGraph( - pipeline, configuration, defaultParallelism); + userClassloader, pipeline, configuration, defaultParallelism); } finally { Thread.currentThread().setContextClassLoader(contextClassLoader); } } /** Extracts the execution plan (as JSON) from the given {@link Pipeline}. */ - public static String translateToJSONExecutionPlan(Pipeline pipeline) { - FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline); + public static String translateToJSONExecutionPlan( + ClassLoader userClassloader, Pipeline pipeline) { + FlinkPipelineTranslator pipelineTranslator = + getPipelineTranslator(userClassloader, pipeline); return pipelineTranslator.translateToJSONExecutionPlan(pipeline); } - private static FlinkPipelineTranslator getPipelineTranslator(Pipeline pipeline) { + private static FlinkPipelineTranslator getPipelineTranslator( + ClassLoader userClassloader, Pipeline pipeline) { PlanTranslator planTranslator = new PlanTranslator(); if (planTranslator.canTranslate(pipeline)) { return planTranslator; } - StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator(); + StreamGraphTranslator streamGraphTranslator = new StreamGraphTranslator(userClassloader); if (streamGraphTranslator.canTranslate(pipeline)) { return streamGraphTranslator; diff --git a/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java b/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java index 3e206c631a0..77e35ed9856 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java +++ b/flink-clients/src/main/java/org/apache/flink/client/StreamGraphTranslator.java @@ -40,6 +40,12 @@ public class StreamGraphTranslator implements FlinkPipelineTranslator { private static final Logger LOG = LoggerFactory.getLogger(StreamGraphTranslator.class); + private final ClassLoader userClassloader; + + public StreamGraphTranslator(ClassLoader userClassloader) { + this.userClassloader = userClassloader; + } + @Override public JobGraph translateToJobGraph( Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) { @@ -47,7 +53,7 @@ public class StreamGraphTranslator implements FlinkPipelineTranslator { pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph."); StreamGraph streamGraph = (StreamGraph) pipeline; - return streamGraph.getJobGraph(null); + return streamGraph.getJobGraph(userClassloader, null); } @Override diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 652df51c292..f603dc90cda 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -355,7 +355,9 @@ public class CliFrontend { Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( program, effectiveConfiguration, parallelism, true); - String jsonPlan = FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline); + String jsonPlan = + FlinkPipelineTranslationUtil.translateToJSONExecutionPlan( + program.getUserCodeClassLoader(), pipeline); if (jsonPlan != null) { System.out.println( diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java index 92995f350c2..7094c1cac13 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java @@ -119,7 +119,8 @@ public class EmbeddedExecutor implements PipelineExecutor { final Time timeout = Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis()); - final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); + final JobGraph jobGraph = + PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader); final JobID actualJobId = jobGraph.getJobID(); this.submittedJobIds.add(actualJobId); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java index 6050488772e..f43e9ef46f9 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractJobClusterExecutor.java @@ -68,7 +68,8 @@ public class AbstractJobClusterExecutor< @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception { - final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); + final JobGraph jobGraph = + PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader); try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java index eeff536de51..52c58e5d035 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java @@ -68,7 +68,8 @@ public class AbstractSessionClusterExecutor< @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception { - final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); + final JobGraph jobGraph = + PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader); try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java index 71f60ff4296..45847a6a62d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java @@ -79,13 +79,14 @@ public class LocalExecutor implements PipelineExecutor { // we only support attached execution with the local executor. checkState(configuration.getBoolean(DeploymentOptions.ATTACHED)); - final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig); + final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader); return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory) .submitJob(jobGraph, userCodeClassloader); } - private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) + private JobGraph getJobGraph( + Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassloader) throws MalformedURLException { // This is a quirk in how LocalEnvironment used to work. It sets the default parallelism // to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour @@ -101,6 +102,6 @@ public class LocalExecutor implements PipelineExecutor { plan.setDefaultParallelism(slotsPerTaskManager * numTaskManagers); } - return PipelineExecutorUtils.getJobGraph(pipeline, configuration); + return PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java index 06b1ada7d4f..76c5d67cc40 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java @@ -38,14 +38,17 @@ public class PipelineExecutorUtils { /** * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}. * - * @param pipeline the pipeline whose job graph we are computing + * @param pipeline the pipeline whose job graph we are computing. * @param configuration the configuration with the necessary information such as jars and * classpaths to be included, the parallelism of the job and potential savepoint settings * used to bootstrap its state. + * @param userClassloader the classloader which can load user classes. * @return the corresponding {@link JobGraph}. */ public static JobGraph getJobGraph( - @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) + @Nonnull final Pipeline pipeline, + @Nonnull final Configuration configuration, + @Nonnull ClassLoader userClassloader) throws MalformedURLException { checkNotNull(pipeline); checkNotNull(configuration); @@ -54,7 +57,10 @@ public class PipelineExecutorUtils { ExecutionConfigAccessor.fromConfiguration(configuration); final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph( - pipeline, configuration, executionConfigAccessor.getParallelism()); + userClassloader, + pipeline, + configuration, + executionConfigAccessor.getParallelism()); configuration .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java index 837bc8a99e9..82f95cc08c3 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendPackageProgramTest.java @@ -326,7 +326,8 @@ class CliFrontendPackageProgramTest { // we expect this to fail with a "ClassNotFoundException" Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram(prog, c, 666, true); - FlinkPipelineTranslationUtil.translateToJSONExecutionPlan(pipeline); + FlinkPipelineTranslationUtil.translateToJSONExecutionPlan( + prog.getUserCodeClassLoader(), pipeline); fail("Should have failed with a ClassNotFoundException"); } catch (ProgramInvocationException e) { if (!(e.getCause() instanceof ClassNotFoundException)) { diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index f7a00c0fde8..b1659eb732c 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -281,7 +281,12 @@ class ClientTest { void shouldSubmitToJobClient() { final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster()); - JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, new Configuration(), 1); + JobGraph jobGraph = + FlinkPipelineTranslationUtil.getJobGraph( + Thread.currentThread().getContextClassLoader(), + plan, + new Configuration(), + 1); jobGraph.addJars(Collections.emptyList()); jobGraph.setClasspaths(Collections.emptyList()); @@ -516,7 +521,8 @@ class ClientTest { return (pipeline, config, classLoader) -> { final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM); final JobGraph jobGraph = - FlinkPipelineTranslationUtil.getJobGraph(plan, config, parallelism); + FlinkPipelineTranslationUtil.getJobGraph( + classLoader, plan, config, parallelism); final ExecutionConfigAccessor accessor = ExecutionConfigAccessor.fromConfiguration(config); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java index 8cf20164fe5..cb750edbd46 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverTest.java @@ -552,7 +552,8 @@ class DefaultPackagedProgramRetrieverTest { final Pipeline pipeline = PackagedProgramUtils.getPipelineFromProgram( packagedProgram, configuration, defaultParallelism, false); - return PipelineExecutorUtils.getJobGraph(pipeline, configuration); + return PipelineExecutorUtils.getJobGraph( + pipeline, configuration, packagedProgram.getUserCodeClassLoader()); } private static List<String> extractRelativizedURLsForJarsFromDirectory(File directory) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java index a574d1f6459..e0cc0f00c99 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/UsingRemoteJarITCase.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -110,6 +111,16 @@ public class UsingRemoteJarITCase extends SqlITCaseBase { "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}")); } + @Test + public void testScalarUdfWhenCheckpointEnable() throws Exception { + runAndCheckSQL( + "scalar_udf_e2e.sql", + generateReplaceVars(), + 1, + Collections.singletonList( + "{\"before\":null,\"after\":{\"id\":1,\"str\":\"Hello Flink\"},\"op\":\"c\"}")); + } + @Test public void testCreateTemporarySystemFunctionUsingRemoteJar() throws Exception { Map<String, String> replaceVars = generateReplaceVars(); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/scalar_udf_e2e.sql b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/scalar_udf_e2e.sql new file mode 100644 index 00000000000..ec2d929faa0 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/scalar_udf_e2e.sql @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE JsonTable ( + id INT, + str VARCHAR +) WITH ( + 'connector' = 'filesystem', + 'path' = '$RESULT', + 'sink.rolling-policy.rollover-interval' = '2s', + 'sink.rolling-policy.check-interval' = '2s', + 'format' = 'debezium-json' +); + +ADD JAR '$JAR_PATH'; +create function func1 as 'org.apache.flink.table.toolbox.StringRegexReplaceFunction' LANGUAGE JAVA; + +SET execution.runtime-mode = $MODE; + +INSERT INTO JsonTable +SELECT id, func1(str, 'World', 'Flink') FROM (VALUES (1, 'Hello World')) AS T(id, str); + diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java index 2148c185d36..ebbd9a6ba1c 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/runtime/OperatorIDGeneratorTest.java @@ -59,7 +59,8 @@ public class OperatorIDGeneratorTest { .disableChaining() .addSink(new DiscardingSink<>()); - JobGraph graph = env.getStreamGraph().getJobGraph(new JobID()); + JobGraph graph = + env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), new JobID()); JobVertex vertex = StreamSupport.stream(graph.getVertices().spliterator(), false) .filter(node -> node.getName().contains(OPERATOR_NAME)) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 8420539d798..1542022f200 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -1007,13 +1007,14 @@ public class StreamGraph implements Pipeline { } /** Gets the assembled {@link JobGraph} with a random {@link JobID}. */ + @VisibleForTesting public JobGraph getJobGraph() { - return getJobGraph(null); + return getJobGraph(Thread.currentThread().getContextClassLoader(), null); } /** Gets the assembled {@link JobGraph} with a specified {@link JobID}. */ - public JobGraph getJobGraph(@Nullable JobID jobID) { - return StreamingJobGraphGenerator.createJobGraph(this, jobID); + public JobGraph getJobGraph(ClassLoader userClassLoader, @Nullable JobID jobID) { + return StreamingJobGraphGenerator.createJobGraph(userClassLoader, this, jobID); } public String getStreamingPlanAsJSON() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index c50e7437959..a3a173a5d50 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -126,10 +126,16 @@ public class StreamingJobGraphGenerator { @VisibleForTesting public static JobGraph createJobGraph(StreamGraph streamGraph) { - return new StreamingJobGraphGenerator(streamGraph, null, Runnable::run).createJobGraph(); + return new StreamingJobGraphGenerator( + Thread.currentThread().getContextClassLoader(), + streamGraph, + null, + Runnable::run) + .createJobGraph(); } - public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) { + public static JobGraph createJobGraph( + ClassLoader userClassLoader, StreamGraph streamGraph, @Nullable JobID jobID) { // TODO Currently, we construct a new thread pool for the compilation of each job. In the // future, we may refactor the job submission framework and make it reusable across jobs. final ExecutorService serializationExecutor = @@ -141,7 +147,8 @@ public class StreamingJobGraphGenerator { streamGraph.getExecutionConfig().getParallelism())), new ExecutorThreadFactory("flink-operator-serialization-io")); try { - return new StreamingJobGraphGenerator(streamGraph, jobID, serializationExecutor) + return new StreamingJobGraphGenerator( + userClassLoader, streamGraph, jobID, serializationExecutor) .createJobGraph(); } finally { serializationExecutor.shutdown(); @@ -150,6 +157,7 @@ public class StreamingJobGraphGenerator { // ------------------------------------------------------------------------ + private final ClassLoader userClassloader; private final StreamGraph streamGraph; private final Map<Integer, JobVertex> jobVertices; @@ -181,7 +189,11 @@ public class StreamingJobGraphGenerator { private final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs; private StreamingJobGraphGenerator( - StreamGraph streamGraph, @Nullable JobID jobID, Executor serializationExecutor) { + ClassLoader userClassloader, + StreamGraph streamGraph, + @Nullable JobID jobID, + Executor serializationExecutor) { + this.userClassloader = userClassloader; this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher()); @@ -455,11 +467,11 @@ public class StreamingJobGraphGenerator { + "\nThe user can force Unaligned Checkpoints by using 'execution.checkpointing.unaligned.forced'"); } - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); for (StreamNode node : streamGraph.getStreamNodes()) { StreamOperatorFactory operatorFactory = node.getOperatorFactory(); if (operatorFactory != null) { - Class<?> operatorClass = operatorFactory.getStreamOperatorClass(classLoader); + Class<?> operatorClass = + operatorFactory.getStreamOperatorClass(userClassloader); if (InputSelectable.class.isAssignableFrom(operatorClass)) { throw new UnsupportedOperationException( diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java index c55118202fa..c86639f76b3 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java @@ -156,7 +156,8 @@ public class MiniClusterPipelineExecutorServiceLoader implements PipelineExecuto public CompletableFuture<JobClient> execute( Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassLoader) throws Exception { - final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration); + final JobGraph jobGraph = + PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassLoader); if (jobGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none() && pipeline instanceof StreamGraph) { jobGraph.setSavepointRestoreSettings( diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java index b63ddf53e8d..7db742bb295 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java @@ -206,7 +206,7 @@ public abstract class ChangelogRecoveryITCaseBase extends TestLogger { Collector<Integer> out) {} }) .addSink(new DiscardingSink<>()); - return env.getStreamGraph().getJobGraph(jobId); + return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobId); } protected void waitAndAssert(JobGraph jobGraph) throws Exception { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java index 3419f6c3df7..89313227c75 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java @@ -278,7 +278,7 @@ public class RescaleCheckpointManuallyITCase extends TestLogger { result.addSink(new CollectionSink<>()); - return env.getStreamGraph().getJobGraph(jobID.get()); + return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get()); } private static class NotifyingDefiniteKeySource extends RichParallelSourceFunction<Integer> {