This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0388b760fc6 [FLINK-33316][runtime] Using SERIALIZED_UDF_CLASS instead of SERIALIZED_UDF_CLASS_NAME 0388b760fc6 is described below commit 0388b760fc66975c70f797ad07f2e073738a7171 Author: 1996fanrui <1996fan...@gmail.com> AuthorDate: Thu Oct 26 11:28:48 2023 +0800 [FLINK-33316][runtime] Using SERIALIZED_UDF_CLASS instead of SERIALIZED_UDF_CLASS_NAME --- .../apache/flink/streaming/api/graph/StreamConfig.java | 15 ++++++++++----- .../flink/streaming/runtime/tasks/OperatorChain.java | 16 +++++++++++++--- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 2fb5b81d4a5..79a5e904ea9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -83,7 +83,7 @@ public class StreamConfig implements Serializable { * Introduce serializedUdfClassName to avoid unnecessarily heavy {@link * #getStreamOperatorFactory}. */ - public static final String SERIALIZED_UDF_CLASS_NAME = "serializedUdfClassName"; + public static final String SERIALIZED_UDF_CLASS = "serializedUdfClass"; private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs"; private static final String NUMBER_OF_NETWORK_INPUTS = "numberOfNetworkInputs"; @@ -374,7 +374,7 @@ public class StreamConfig implements Serializable { public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) { if (factory != null) { toBeSerializedConfigObjects.put(SERIALIZED_UDF, factory); - config.setString(SERIALIZED_UDF_CLASS_NAME, factory.getClass().getName()); + toBeSerializedConfigObjects.put(SERIALIZED_UDF_CLASS, factory.getClass()); } } @@ -406,8 +406,13 @@ public class StreamConfig implements Serializable { } } - public String getStreamOperatorFactoryClassName() { - return config.getString(SERIALIZED_UDF_CLASS_NAME, null); + public <T extends StreamOperatorFactory<?>> Class<T> getStreamOperatorFactoryClass( + ClassLoader cl) { + try { + return InstantiationUtil.readObjectFromConfig(this.config, SERIALIZED_UDF_CLASS, cl); + } catch (Exception e) { + throw new StreamTaskException("Could not instantiate serialized udf class.", e); + } } public void setIterationId(String iterationId) { @@ -768,7 +773,7 @@ public class StreamConfig implements Serializable { try { builder.append("\nOperator: ") - .append(getStreamOperatorFactory(cl).getClass().getSimpleName()); + .append(getStreamOperatorFactoryClass(cl).getSimpleName()); } catch (Exception e) { builder.append("\nOperator: Missing"); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 1afee0f75b1..2a7a8dc3c1b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -641,7 +641,10 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> @Nullable private Counter getOperatorRecordsOutCounter( StreamTask<?, ?> containingTask, StreamConfig operatorConfig) { - String streamOperatorFactoryClassName = operatorConfig.getStreamOperatorFactoryClassName(); + ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader(); + Class<StreamOperatorFactory<?>> streamOperatorFactoryClass = + operatorConfig.getStreamOperatorFactoryClass(userCodeClassloader); + // Do not use the numRecordsOut counter on output if this operator is SinkWriterOperator. // // Metric "numRecordsOut" is defined as the total number of records written to the @@ -649,8 +652,15 @@ public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> // number of records sent to downstream operators, which is number of Committable batches // sent to SinkCommitter. So we skip registering this metric on output and leave this metric // to sink writer implementations to report. - if (SinkWriterOperatorFactory.class.getName().equals(streamOperatorFactoryClassName)) { - return null; + try { + Class<?> sinkWriterFactoryClass = + userCodeClassloader.loadClass(SinkWriterOperatorFactory.class.getName()); + if (sinkWriterFactoryClass.isAssignableFrom(streamOperatorFactoryClass)) { + return null; + } + } catch (ClassNotFoundException e) { + throw new StreamTaskException( + "Could not load SinkWriterOperatorFactory class from userCodeClassloader.", e); } InternalOperatorMetricGroup operatorMetricGroup =