zhuzhurk commented on code in PR #25472:
URL: https://github.com/apache/flink/pull/25472#discussion_r1808402242
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java:
##########
@@ -80,6 +83,14 @@ public SchedulerNG createInstance(
final Collection<FailureEnricher> failureEnrichers,
final BlocklistOperations blocklistOperations)
throws Exception {
+ JobGraph jobGraph;
+
+ if (executionPlan instanceof JobGraph) {
+ jobGraph = (JobGraph) executionPlan;
+ } else {
+ checkState(executionPlan instanceof StreamGraph, "Unsupported
execution plan.");
Review Comment:
maybe
```
} else if (executionPlan instanceof StreamGraph) {
...
} else {
throw FlinkException("Unsupported execution plan " +
executionPlan.getClass().getCanonicalName());
}
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java:
##########
@@ -1066,6 +1067,14 @@ public CompletableFuture<JobSubmissionResult>
submitJob(ExecutionPlan executionP
// When MiniCluster uses the local RPC, the provided ExecutionPlan is
passed directly to the
// Dispatcher. This means that any mutations to the JG can affect the
Dispatcher behaviour,
// so we rather clone it to guard against this.
Review Comment:
Comments above are for the original line.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -169,10 +225,15 @@ public ExecutionConfig getExecutionConfig() {
return executionConfig;
}
+ @Override
public Configuration getJobConfiguration() {
return jobConfiguration;
}
+ public void setJobConfiguration(Configuration configuration) {
Review Comment:
Is this method required? If not, we can keep `jobConfiguration` final.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1081,4 +1269,345 @@ public void setAttribute(Integer vertexId, Attribute
attribute) {
getStreamNode(vertexId).setAttribute(attribute);
}
}
+
+ public void setJobId(JobID jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public JobID getJobID() {
+ return jobId;
+ }
+
+ /**
+ * Sets the classpath required to run the job on a task manager.
+ *
+ * @param paths paths of the directories/JAR files required to run the job
on a task manager
+ */
+ public void setClasspath(List<URL> paths) {
+ classpath = paths;
+ }
+
+ public List<URL> getClasspath() {
+ return classpath;
+ }
+
+ /**
+ * Adds the given jar files to the {@link JobGraph} via {@link
JobGraph#addJar}.
+ *
+ * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files
to attach to the
+ * jobgraph.
+ * @throws RuntimeException if a jar URL is not valid.
+ */
+ public void addJars(final List<URL> jarFilesToAttach) {
+ for (URL jar : jarFilesToAttach) {
+ try {
+ addJar(new Path(jar.toURI()));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("URL is invalid. This should not
happen.", e);
+ }
+ }
+ }
+
+ /**
+ * Returns a list of BLOB keys referring to the JAR files required to run
this job.
+ *
+ * @return list of BLOB keys referring to the JAR files required to run
this job
+ */
+ @Override
+ public List<PermanentBlobKey> getUserJarBlobKeys() {
+ return this.userJarBlobKeys;
+ }
+
+ @Override
+ public List<URL> getClasspaths() {
+ return classpath;
+ }
+
+ public void addUserArtifact(String name,
DistributedCache.DistributedCacheEntry file) {
+ if (file == null) {
+ throw new IllegalArgumentException();
+ }
+
+ userArtifacts.putIfAbsent(name, file);
+ }
+
+ @Override
+ public Map<String, DistributedCache.DistributedCacheEntry>
getUserArtifacts() {
+ return userArtifacts;
+ }
+
+ @Override
+ public void addUserJarBlobKey(PermanentBlobKey key) {
+ if (key == null) {
+ throw new IllegalArgumentException();
+ }
+
+ if (!userJarBlobKeys.contains(key)) {
+ userJarBlobKeys.add(key);
+ }
+ }
+
+ @Override
+ public void setUserArtifactBlobKey(String entryName, PermanentBlobKey
blobKey)
+ throws IOException {
+ byte[] serializedBlobKey;
+ serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
+
+ userArtifacts.computeIfPresent(
+ entryName,
+ (key, originalEntry) ->
+ new DistributedCache.DistributedCacheEntry(
+ originalEntry.filePath,
+ originalEntry.isExecutable,
+ serializedBlobKey,
+ originalEntry.isZipped));
+ }
+
+ @Override
+ public void writeUserArtifactEntriesToConfiguration() {
+ for (Map.Entry<String, DistributedCache.DistributedCacheEntry>
userArtifact :
+ userArtifacts.entrySet()) {
+ DistributedCache.writeFileInfoToConfig(
+ userArtifact.getKey(), userArtifact.getValue(),
jobConfiguration);
+ }
+ }
+
+ @Override
+ public int getMaximumParallelism() {
+ int maxParallelism = -1;
+ for (StreamNode node : streamNodes.values()) {
+ maxParallelism = Math.max(node.getParallelism(), maxParallelism);
+ }
+ return maxParallelism;
+ }
+
+ public void setInitialClientHeartbeatTimeout(long
initialClientHeartbeatTimeout) {
+ this.initialClientHeartbeatTimeout = initialClientHeartbeatTimeout;
+ }
+
+ @Override
+ public long getInitialClientHeartbeatTimeout() {
+ return initialClientHeartbeatTimeout;
+ }
+
+ @Override
+ public boolean isPartialResourceConfigured() {
+ return isPartialResourceConfigured;
+ }
+
+ public void serializeUserDefinedInstances() throws IOException {
+ final ExecutorService serializationExecutor =
+ Executors.newFixedThreadPool(
+ Math.max(
+ 1,
+ Math.min(
+ Hardware.getNumberCPUCores(),
+
getExecutionConfig().getParallelism())),
+ new
ExecutorThreadFactory("flink-operator-serialization-io"));
+ try {
+ this.userDefinedObjectsHolder =
+ new UserDefinedObjectsHolder(
+ streamNodes,
+ virtualSideOutputNodes,
+ virtualPartitionNodes,
+ executionConfig,
+ stateBackend,
+ checkpointStorage,
+ serializationExecutor);
+ this.isPartialResourceConfigured =
isPartialResourceConfiguredGraph();
+ this.isEmpty = streamNodes.isEmpty();
+ } finally {
+ serializationExecutor.shutdown();
+ }
+ }
+
+ public void deserializeUserDefinedInstances(
+ ClassLoader userClassLoader, Executor serializationExecutor)
throws Exception {
+ this.userDefinedObjectsHolder.deserialize(userClassLoader,
serializationExecutor);
+ }
+
+ private boolean isPartialResourceConfiguredGraph() {
+ boolean hasVerticesWithUnknownResource = false;
+ boolean hasVerticesWithConfiguredResource = false;
+
+ for (StreamNode streamNode : this.getStreamNodes()) {
+ if (streamNode.getMinResources() == ResourceSpec.UNKNOWN) {
+ hasVerticesWithUnknownResource = true;
+ } else {
+ hasVerticesWithConfiguredResource = true;
+ }
+
+ if (hasVerticesWithUnknownResource &&
hasVerticesWithConfiguredResource) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "StreamGraph(jobId: " + jobId + ")";
+ }
+
+ /**
+ * A static inner class designed to hold user-defined objects for
serialization and
+ * deserialization in the stream graph.
+ */
+ private class UserDefinedObjectsHolder implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final SerializedValue<
+ Map<Integer, Tuple3<Integer, StreamPartitioner<?>,
StreamExchangeMode>>>
+ serializedVirtualPartitionNodes;
+
+ private final SerializedValue<ExecutionConfig>
serializedExecutionConfig;
+
+ private SerializedValue<Map<Integer, StreamNode>>
serializedStreamNodes;
+
+ /**
+ * This collection stores operator factories serialized separately
from the {@link
+ * StreamGraph}. This separation allows for the parallel serialization
of operator
+ * factories, improving the overall performance of the serialization
process.
+ *
+ * <p>Each tuple in this collection consists of an integer key that
identifies the stream
+ * node, and a value that wraps the serialized representation of the
associated {@link
+ * StreamOperatorFactory} instance.
+ */
+ private Collection<Tuple2<Integer,
SerializedValue<StreamOperatorFactory<?>>>>
+ streamNodeToSerializedOperatorFactories;
+
+ private final SerializedValue<Map<Integer, Tuple2<Integer, OutputTag>>>
+ serializedVirtualSideOutputNodes;
+
+ public UserDefinedObjectsHolder(
+ Map<Integer, StreamNode> streamNodes,
+ Map<Integer, Tuple2<Integer, OutputTag>>
virtualSideOutputNodes,
+ Map<Integer, Tuple3<Integer, StreamPartitioner<?>,
StreamExchangeMode>>
+ virtualPartitionNodes,
+ ExecutionConfig executionConfig,
+ @Nullable StateBackend stateBackend,
+ @Nullable CheckpointStorage checkpointStorage,
+ Executor serializationExecutor)
+ throws IOException {
+ serializeStreamNodes(streamNodes, serializationExecutor);
+
+ this.serializedVirtualSideOutputNodes = new
SerializedValue<>(virtualSideOutputNodes);
+ this.serializedVirtualPartitionNodes = new
SerializedValue<>(virtualPartitionNodes);
+ this.serializedExecutionConfig = new
SerializedValue<>(executionConfig);
+
+ if (stateBackend != null && serializedStateBackend == null) {
+ serializedStateBackend = new SerializedValue<>(stateBackend);
+ }
+
+ if (checkpointStorage != null && serializedCheckpointStorage ==
null) {
+ serializedCheckpointStorage = new
SerializedValue<>(checkpointStorage);
+ }
Review Comment:
How about to ensure `JobCheckpointingSettings` is created right after
`stateBackend` and `checkpointStorage` are set, and let it be the holder of
`serializedStateBackend` and `serializedCheckpointStorage`? Looks to me it can
simplify things a lot.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -1081,4 +1269,345 @@ public void setAttribute(Integer vertexId, Attribute
attribute) {
getStreamNode(vertexId).setAttribute(attribute);
}
}
+
+ public void setJobId(JobID jobId) {
+ this.jobId = jobId;
+ }
+
+ @Override
+ public JobID getJobID() {
+ return jobId;
+ }
+
+ /**
+ * Sets the classpath required to run the job on a task manager.
+ *
+ * @param paths paths of the directories/JAR files required to run the job
on a task manager
+ */
+ public void setClasspath(List<URL> paths) {
+ classpath = paths;
+ }
+
+ public List<URL> getClasspath() {
+ return classpath;
+ }
+
+ /**
+ * Adds the given jar files to the {@link JobGraph} via {@link
JobGraph#addJar}.
+ *
+ * @param jarFilesToAttach a list of the {@link URL URLs} of the jar files
to attach to the
+ * jobgraph.
+ * @throws RuntimeException if a jar URL is not valid.
+ */
+ public void addJars(final List<URL> jarFilesToAttach) {
+ for (URL jar : jarFilesToAttach) {
+ try {
+ addJar(new Path(jar.toURI()));
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("URL is invalid. This should not
happen.", e);
+ }
+ }
+ }
+
+ /**
+ * Returns a list of BLOB keys referring to the JAR files required to run
this job.
+ *
+ * @return list of BLOB keys referring to the JAR files required to run
this job
+ */
+ @Override
+ public List<PermanentBlobKey> getUserJarBlobKeys() {
+ return this.userJarBlobKeys;
+ }
+
+ @Override
+ public List<URL> getClasspaths() {
+ return classpath;
+ }
+
+ public void addUserArtifact(String name,
DistributedCache.DistributedCacheEntry file) {
+ if (file == null) {
+ throw new IllegalArgumentException();
+ }
+
+ userArtifacts.putIfAbsent(name, file);
+ }
+
+ @Override
+ public Map<String, DistributedCache.DistributedCacheEntry>
getUserArtifacts() {
+ return userArtifacts;
+ }
+
+ @Override
+ public void addUserJarBlobKey(PermanentBlobKey key) {
+ if (key == null) {
+ throw new IllegalArgumentException();
+ }
+
+ if (!userJarBlobKeys.contains(key)) {
+ userJarBlobKeys.add(key);
+ }
+ }
+
+ @Override
+ public void setUserArtifactBlobKey(String entryName, PermanentBlobKey
blobKey)
+ throws IOException {
+ byte[] serializedBlobKey;
+ serializedBlobKey = InstantiationUtil.serializeObject(blobKey);
+
+ userArtifacts.computeIfPresent(
+ entryName,
+ (key, originalEntry) ->
+ new DistributedCache.DistributedCacheEntry(
+ originalEntry.filePath,
+ originalEntry.isExecutable,
+ serializedBlobKey,
+ originalEntry.isZipped));
+ }
+
+ @Override
+ public void writeUserArtifactEntriesToConfiguration() {
+ for (Map.Entry<String, DistributedCache.DistributedCacheEntry>
userArtifact :
+ userArtifacts.entrySet()) {
+ DistributedCache.writeFileInfoToConfig(
+ userArtifact.getKey(), userArtifact.getValue(),
jobConfiguration);
+ }
+ }
+
+ @Override
+ public int getMaximumParallelism() {
+ int maxParallelism = -1;
+ for (StreamNode node : streamNodes.values()) {
+ maxParallelism = Math.max(node.getParallelism(), maxParallelism);
+ }
+ return maxParallelism;
+ }
+
+ public void setInitialClientHeartbeatTimeout(long
initialClientHeartbeatTimeout) {
+ this.initialClientHeartbeatTimeout = initialClientHeartbeatTimeout;
+ }
+
+ @Override
+ public long getInitialClientHeartbeatTimeout() {
+ return initialClientHeartbeatTimeout;
+ }
+
+ @Override
+ public boolean isPartialResourceConfigured() {
+ return isPartialResourceConfigured;
+ }
+
+ public void serializeUserDefinedInstances() throws IOException {
+ final ExecutorService serializationExecutor =
+ Executors.newFixedThreadPool(
+ Math.max(
+ 1,
+ Math.min(
+ Hardware.getNumberCPUCores(),
+
getExecutionConfig().getParallelism())),
+ new
ExecutorThreadFactory("flink-operator-serialization-io"));
+ try {
+ this.userDefinedObjectsHolder =
+ new UserDefinedObjectsHolder(
+ streamNodes,
+ virtualSideOutputNodes,
+ virtualPartitionNodes,
+ executionConfig,
+ stateBackend,
+ checkpointStorage,
+ serializationExecutor);
+ this.isPartialResourceConfigured =
isPartialResourceConfiguredGraph();
+ this.isEmpty = streamNodes.isEmpty();
Review Comment:
Looks to me the 2 lines above are unrelated to the serialization work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]