This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0388b760fc6 [FLINK-33316][runtime] Using SERIALIZED_UDF_CLASS instead 
of SERIALIZED_UDF_CLASS_NAME
0388b760fc6 is described below

commit 0388b760fc66975c70f797ad07f2e073738a7171
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Thu Oct 26 11:28:48 2023 +0800

    [FLINK-33316][runtime] Using SERIALIZED_UDF_CLASS instead of 
SERIALIZED_UDF_CLASS_NAME
---
 .../apache/flink/streaming/api/graph/StreamConfig.java   | 15 ++++++++++-----
 .../flink/streaming/runtime/tasks/OperatorChain.java     | 16 +++++++++++++---
 2 files changed, 23 insertions(+), 8 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 2fb5b81d4a5..79a5e904ea9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -83,7 +83,7 @@ public class StreamConfig implements Serializable {
      * Introduce serializedUdfClassName to avoid unnecessarily heavy {@link
      * #getStreamOperatorFactory}.
      */
-    public static final String SERIALIZED_UDF_CLASS_NAME = 
"serializedUdfClassName";
+    public static final String SERIALIZED_UDF_CLASS = "serializedUdfClass";
 
     private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
     private static final String NUMBER_OF_NETWORK_INPUTS = 
"numberOfNetworkInputs";
@@ -374,7 +374,7 @@ public class StreamConfig implements Serializable {
     public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
         if (factory != null) {
             toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory);
-            config.setString(SERIALIZED_UDF_CLASS_NAME, 
factory.getClass().getName());
+            toBeSerializedConfigObjects.put(SERIALIZED_UDF_CLASS, 
factory.getClass());
         }
     }
 
@@ -406,8 +406,13 @@ public class StreamConfig implements Serializable {
         }
     }
 
-    public String getStreamOperatorFactoryClassName() {
-        return config.getString(SERIALIZED_UDF_CLASS_NAME, null);
+    public <T extends StreamOperatorFactory<?>> Class<T> 
getStreamOperatorFactoryClass(
+            ClassLoader cl) {
+        try {
+            return InstantiationUtil.readObjectFromConfig(this.config, 
SERIALIZED_UDF_CLASS, cl);
+        } catch (Exception e) {
+            throw new StreamTaskException("Could not instantiate serialized 
udf class.", e);
+        }
     }
 
     public void setIterationId(String iterationId) {
@@ -768,7 +773,7 @@ public class StreamConfig implements Serializable {
 
         try {
             builder.append("\nOperator: ")
-                    
.append(getStreamOperatorFactory(cl).getClass().getSimpleName());
+                    .append(getStreamOperatorFactoryClass(cl).getSimpleName());
         } catch (Exception e) {
             builder.append("\nOperator: Missing");
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 1afee0f75b1..2a7a8dc3c1b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -641,7 +641,10 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
     @Nullable
     private Counter getOperatorRecordsOutCounter(
             StreamTask<?, ?> containingTask, StreamConfig operatorConfig) {
-        String streamOperatorFactoryClassName = 
operatorConfig.getStreamOperatorFactoryClassName();
+        ClassLoader userCodeClassloader = 
containingTask.getUserCodeClassLoader();
+        Class<StreamOperatorFactory<?>> streamOperatorFactoryClass =
+                
operatorConfig.getStreamOperatorFactoryClass(userCodeClassloader);
+
         // Do not use the numRecordsOut counter on output if this operator is 
SinkWriterOperator.
         //
         // Metric "numRecordsOut" is defined as the total number of records 
written to the
@@ -649,8 +652,15 @@ public abstract class OperatorChain<OUT, OP extends 
StreamOperator<OUT>>
         // number of records sent to downstream operators, which is number of 
Committable batches
         // sent to SinkCommitter. So we skip registering this metric on output 
and leave this metric
         // to sink writer implementations to report.
-        if 
(SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName))
 {
-            return null;
+        try {
+            Class<?> sinkWriterFactoryClass =
+                    
userCodeClassloader.loadClass(SinkWriterOperatorFactory.class.getName());
+            if 
(sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)) {
+                return null;
+            }
+        } catch (ClassNotFoundException e) {
+            throw new StreamTaskException(
+                    "Could not load SinkWriterOperatorFactory class from 
userCodeClassloader.", e);
         }
 
         InternalOperatorMetricGroup operatorMetricGroup =

Reply via email to