This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e0bb5e947a [FLINK-26675][runtime] Parallelized heavy serialization in 
StreamingJobGraphGenerator
8e0bb5e947a is described below

commit 8e0bb5e947a6b4f09ca1df68c820b9f45425d411
Author: Yangze Guo <karma...@gmail.com>
AuthorDate: Wed Mar 16 15:56:28 2022 +0800

    [FLINK-26675][runtime] Parallelized heavy serialization in 
StreamingJobGraphGenerator
    
    This closes #19108.
---
 .../flink/state/api/BootstrapTransformation.java   |   1 +
 .../state/api/StateBootstrapTransformation.java    |   1 +
 .../flink/streaming/api/graph/StreamConfig.java    | 174 +++++++++++----------
 .../api/graph/StreamingJobGraphGenerator.java      |  76 +++++++--
 .../StreamSourceOperatorLatencyMetricsTest.java    |   1 +
 .../tasks/InterruptSensitiveRestoreTest.java       |   1 +
 .../runtime/tasks/OneInputStreamTaskTest.java      |   3 +-
 .../tasks/OneInputStreamTaskTestHarness.java       |   2 +
 .../streaming/runtime/tasks/OperatorChainTest.java |   2 +-
 .../runtime/tasks/StreamConfigChainer.java         |  10 +-
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |   6 +-
 .../runtime/tasks/StreamTaskSystemExitTest.java    |   1 +
 .../runtime/tasks/StreamTaskTerminationTest.java   |   1 +
 .../streaming/runtime/tasks/StreamTaskTest.java    |   4 +-
 .../runtime/tasks/StreamTaskTestHarness.java       |   4 +
 .../tasks/TaskCheckpointingBehaviourTest.java      |   1 +
 .../AbstractStreamOperatorTestHarnessTest.java     |   1 +
 .../util/KeyedBroadcastOperatorTestHarness.java    |   1 +
 .../KeyedMultiInputStreamOperatorTestHarness.java  |   2 +
 .../KeyedOneInputStreamOperatorTestHarness.java    |   2 +
 .../KeyedTwoInputStreamOperatorTestHarness.java    |   1 +
 .../flink/streaming/util/MockStreamConfig.java     |   1 +
 .../util/OneInputStreamOperatorTestHarness.java    |   6 +
 .../MultipleInputStreamOperatorBase.java           |   1 +
 24 files changed, 202 insertions(+), 101 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
index 66f75f3a1c7..a58fb1e5174 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java
@@ -235,6 +235,7 @@ public class BootstrapTransformation<T> {
         // This means leaving this stateBackend unwrapped.
         config.setChangelogStateBackendEnabled(TernaryBoolean.FALSE);
         
config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND,
 1.0);
+        config.serializeAllConfigs();
         return config;
     }
 
diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
index 615067c1241..64a898df1ce 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java
@@ -223,6 +223,7 @@ public class StateBootstrapTransformation<T> {
         // This means leaving this stateBackend unwrapped.
         config.setChangelogStateBackendEnabled(TernaryBoolean.FALSE);
         
config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND,
 1.0);
+        config.serializeAllConfigs();
         return config;
     }
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index b1bc3d633a0..8c85b4343d2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -44,6 +44,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TernaryBoolean;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -54,6 +55,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -132,6 +136,15 @@ public class StreamConfig implements Serializable {
 
     private final Configuration config;
 
+    // To make the parallelization of the StreamConfig serialization easier, 
we use this map
+    // to collect all the need-to-be-serialized objects. These objects will be 
serialized all at
+    // once then.
+    private final transient Map<String, Object> toBeSerializedConfigObjects = 
new HashMap<>();
+    private final transient Map<Integer, CompletableFuture<StreamConfig>> 
chainedTaskFutures =
+            new HashMap<>();
+    private final transient CompletableFuture<StreamConfig> 
serializationFuture =
+            new CompletableFuture<>();
+
     public StreamConfig(Configuration config) {
         this.config = config;
     }
@@ -140,6 +153,67 @@ public class StreamConfig implements Serializable {
         return config;
     }
 
+    public CompletableFuture<StreamConfig> getSerializationFuture() {
+        return serializationFuture;
+    }
+
+    /** Trigger the object config serialization and return the completable 
future. */
+    public CompletableFuture<StreamConfig> triggerSerializationAndReturnFuture(
+            Executor ioExecutor) {
+        FutureUtils.combineAll(chainedTaskFutures.values())
+                .thenAcceptAsync(
+                        chainedConfigs -> {
+                            // Serialize all the objects to config.
+                            serializeAllConfigs();
+
+                            try {
+                                InstantiationUtil.writeObjectToConfig(
+                                        chainedConfigs.stream()
+                                                .collect(
+                                                        Collectors.toMap(
+                                                                
StreamConfig::getVertexID,
+                                                                
Function.identity())),
+                                        this.config,
+                                        CHAINED_TASK_CONFIG);
+                            } catch (IOException e) {
+                                throw new StreamTaskException(
+                                        "Could not serialize object for key 
chained task config.",
+                                        e);
+                            }
+                            serializationFuture.complete(this);
+                        },
+                        ioExecutor);
+        return serializationFuture;
+    }
+
+    /**
+     * Serialize all object configs synchronously. Only used for operators 
which need to reconstruct
+     * the StreamConfig internally or test.
+     */
+    public void serializeAllConfigs() {
+        toBeSerializedConfigObjects.forEach(
+                (key, object) -> {
+                    try {
+                        InstantiationUtil.writeObjectToConfig(object, 
this.config, key);
+                    } catch (IOException e) {
+                        throw new StreamTaskException(
+                                String.format("Could not serialize object for 
key %s.", key), e);
+                    }
+                });
+    }
+
+    @VisibleForTesting
+    public void setAndSerializeTransitiveChainedTaskConfigs(
+            Map<Integer, StreamConfig> chainedTaskConfigs) {
+        try {
+            InstantiationUtil.writeObjectToConfig(
+                    chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG);
+        } catch (IOException e) {
+            throw new StreamTaskException(
+                    "Could not serialize object for key chained task config.", 
e);
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Configured Properties
     // ------------------------------------------------------------------------
@@ -232,11 +306,7 @@ public class StreamConfig implements Serializable {
     }
 
     private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
-        try {
-            InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, 
key);
-        } catch (IOException e) {
-            throw new StreamTaskException("Could not serialize type 
serializer.", e);
-        }
+        toBeSerializedConfigObjects.put(key, typeWrapper);
     }
 
     public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> 
outputTag, ClassLoader cl) {
@@ -258,11 +328,7 @@ public class StreamConfig implements Serializable {
     }
 
     public void setInputs(InputConfig... inputs) {
-        try {
-            InstantiationUtil.writeObjectToConfig(inputs, this.config, INPUTS);
-        } catch (IOException e) {
-            throw new StreamTaskException("Could not serialize inputs.", e);
-        }
+        toBeSerializedConfigObjects.put(INPUTS, inputs);
     }
 
     public InputConfig[] getInputs(ClassLoader cl) {
@@ -304,12 +370,7 @@ public class StreamConfig implements Serializable {
 
     public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) {
         if (factory != null) {
-            try {
-                InstantiationUtil.writeObjectToConfig(factory, this.config, 
SERIALIZEDUDF);
-            } catch (IOException e) {
-                throw new StreamTaskException(
-                        "Cannot serialize operator object " + 
factory.getClass() + ".", e);
-            }
+            toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory);
         }
     }
 
@@ -374,11 +435,7 @@ public class StreamConfig implements Serializable {
     }
 
     public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
-        try {
-            InstantiationUtil.writeObjectToConfig(outputvertexIDs, 
this.config, NONCHAINED_OUTPUTS);
-        } catch (IOException e) {
-            throw new StreamTaskException("Cannot serialize non chained 
outputs.", e);
-        }
+        toBeSerializedConfigObjects.put(NONCHAINED_OUTPUTS, outputvertexIDs);
     }
 
     public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
@@ -392,11 +449,7 @@ public class StreamConfig implements Serializable {
     }
 
     public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
-        try {
-            InstantiationUtil.writeObjectToConfig(chainedOutputs, this.config, 
CHAINED_OUTPUTS);
-        } catch (IOException e) {
-            throw new StreamTaskException("Cannot serialize chained outputs.", 
e);
-        }
+        toBeSerializedConfigObjects.put(CHAINED_OUTPUTS, chainedOutputs);
     }
 
     public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
@@ -410,11 +463,7 @@ public class StreamConfig implements Serializable {
     }
 
     public void setInPhysicalEdges(List<StreamEdge> inEdges) {
-        try {
-            InstantiationUtil.writeObjectToConfig(inEdges, this.config, 
IN_STREAM_EDGES);
-        } catch (IOException e) {
-            throw new StreamTaskException("Cannot serialize inward edges.", e);
-        }
+        toBeSerializedConfigObjects.put(IN_STREAM_EDGES, inEdges);
     }
 
     public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
@@ -472,11 +521,7 @@ public class StreamConfig implements Serializable {
     }
 
     public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
-        try {
-            InstantiationUtil.writeObjectToConfig(outEdgeList, this.config, 
EDGES_IN_ORDER);
-        } catch (IOException e) {
-            throw new StreamTaskException("Could not serialize outputs in 
order.", e);
-        }
+        toBeSerializedConfigObjects.put(EDGES_IN_ORDER, outEdgeList);
     }
 
     public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
@@ -490,12 +535,9 @@ public class StreamConfig implements Serializable {
     }
 
     public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> 
chainedTaskConfigs) {
-
-        try {
-            InstantiationUtil.writeObjectToConfig(
-                    chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG);
-        } catch (IOException e) {
-            throw new StreamTaskException("Could not serialize 
configuration.", e);
+        if (chainedTaskConfigs != null) {
+            chainedTaskConfigs.forEach(
+                    (id, config) -> chainedTaskFutures.put(id, 
config.getSerializationFuture()));
         }
     }
 
@@ -547,23 +589,13 @@ public class StreamConfig implements Serializable {
 
     public void setStateBackend(StateBackend backend) {
         if (backend != null) {
-            try {
-                InstantiationUtil.writeObjectToConfig(backend, this.config, 
STATE_BACKEND);
-                setStateBackendUsesManagedMemory(backend.useManagedMemory());
-            } catch (Exception e) {
-                throw new StreamTaskException("Could not serialize stateHandle 
provider.", e);
-            }
+            toBeSerializedConfigObjects.put(STATE_BACKEND, backend);
+            setStateBackendUsesManagedMemory(backend.useManagedMemory());
         }
     }
 
     public void setChangelogStateBackendEnabled(TernaryBoolean enabled) {
-        try {
-            InstantiationUtil.writeObjectToConfig(
-                    enabled, this.config, ENABLE_CHANGE_LOG_STATE_BACKEND);
-        } catch (Exception e) {
-            throw new StreamTaskException(
-                    "Could not serialize change log state backend enable 
flag.", e);
-        }
+        toBeSerializedConfigObjects.put(ENABLE_CHANGE_LOG_STATE_BACKEND, 
enabled);
     }
 
     @VisibleForTesting
@@ -591,11 +623,7 @@ public class StreamConfig implements Serializable {
 
     public void setSavepointDir(Path directory) {
         if (directory != null) {
-            try {
-                InstantiationUtil.writeObjectToConfig(directory, config, 
SAVEPOINT_DIR);
-            } catch (Exception e) {
-                throw new StreamTaskException("Could not serialize savepoint 
directory.", e);
-            }
+            toBeSerializedConfigObjects.put(SAVEPOINT_DIR, directory);
         }
     }
 
@@ -609,11 +637,7 @@ public class StreamConfig implements Serializable {
 
     public void setCheckpointStorage(CheckpointStorage storage) {
         if (storage != null) {
-            try {
-                InstantiationUtil.writeObjectToConfig(storage, config, 
CHECKPOINT_STORAGE);
-            } catch (Exception e) {
-                throw new StreamTaskException("Could not serialize checkpoint 
storage.", e);
-            }
+            toBeSerializedConfigObjects.put(CHECKPOINT_STORAGE, storage);
         }
     }
 
@@ -627,12 +651,7 @@ public class StreamConfig implements Serializable {
 
     public void setTimerServiceProvider(InternalTimeServiceManager.Provider 
timerServiceProvider) {
         if (timerServiceProvider != null) {
-            try {
-                InstantiationUtil.writeObjectToConfig(
-                        timerServiceProvider, this.config, 
TIMER_SERVICE_PROVIDER);
-            } catch (Exception e) {
-                throw new StreamTaskException("Could not serialize timer 
service provider.", e);
-            }
+            toBeSerializedConfigObjects.put(TIMER_SERVICE_PROVIDER, 
timerServiceProvider);
         }
     }
 
@@ -645,12 +664,7 @@ public class StreamConfig implements Serializable {
     }
 
     public void setStatePartitioner(int input, KeySelector<?, ?> partitioner) {
-        try {
-            InstantiationUtil.writeObjectToConfig(
-                    partitioner, this.config, STATE_PARTITIONER + input);
-        } catch (IOException e) {
-            throw new StreamTaskException("Could not serialize state 
partitioner.", e);
-        }
+        toBeSerializedConfigObjects.put(STATE_PARTITIONER + input, 
partitioner);
     }
 
     public <IN, K extends Serializable> KeySelector<IN, K> getStatePartitioner(
@@ -664,11 +678,7 @@ public class StreamConfig implements Serializable {
     }
 
     public void setStateKeySerializer(TypeSerializer<?> serializer) {
-        try {
-            InstantiationUtil.writeObjectToConfig(serializer, this.config, 
STATE_KEY_SERIALIZER);
-        } catch (IOException e) {
-            throw new StreamTaskException("Could not serialize state key 
serializer.", e);
-        }
+        toBeSerializedConfigObjects.put(STATE_KEY_SERIALIZER, serializer);
     }
 
     public <K> TypeSerializer<K> getStateKeySerializer(ClassLoader cl) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 6c903274473..d83e5dfa098 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -54,6 +54,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
@@ -75,7 +76,10 @@ import 
org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -97,6 +101,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -113,12 +121,28 @@ public class StreamingJobGraphGenerator {
 
     // ------------------------------------------------------------------------
 
+    @VisibleForTesting
     public static JobGraph createJobGraph(StreamGraph streamGraph) {
-        return createJobGraph(streamGraph, null);
+        return new StreamingJobGraphGenerator(streamGraph, null, 
Runnable::run).createJobGraph();
     }
 
     public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable 
JobID jobID) {
-        return new StreamingJobGraphGenerator(streamGraph, 
jobID).createJobGraph();
+        // TODO Currently, we construct a new thread pool for the compilation 
of each job. In the
+        // future, we may refactor the job submission framework and make it 
reusable across jobs.
+        final ExecutorService serializationExecutor =
+                Executors.newFixedThreadPool(
+                        Math.max(
+                                1,
+                                Math.min(
+                                        Hardware.getNumberCPUCores(),
+                                        
streamGraph.getExecutionConfig().getParallelism())),
+                        new 
ExecutorThreadFactory("flink-operator-serialization-io"));
+        try {
+            return new StreamingJobGraphGenerator(streamGraph, jobID, 
serializationExecutor)
+                    .createJobGraph();
+        } finally {
+            serializationExecutor.shutdown();
+        }
     }
 
     // ------------------------------------------------------------------------
@@ -146,7 +170,13 @@ public class StreamingJobGraphGenerator {
 
     private boolean hasHybridResultPartition = false;
 
-    private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable 
JobID jobID) {
+    private final Executor serializationExecutor;
+
+    // Futures for the serialization of operator coordinators
+    private final List<CompletableFuture<Void>> 
coordinatorSerializationFutures = new ArrayList<>();
+
+    private StreamingJobGraphGenerator(
+            StreamGraph streamGraph, @Nullable JobID jobID, Executor 
serializationExecutor) {
         this.streamGraph = streamGraph;
         this.defaultStreamGraphHasher = new StreamGraphHasherV2();
         this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphUserHashHasher());
@@ -160,6 +190,7 @@ public class StreamingJobGraphGenerator {
         this.chainedPreferredResources = new HashMap<>();
         this.chainedInputOutputFormats = new HashMap<>();
         this.physicalEdgesInOrder = new ArrayList<>();
+        this.serializationExecutor = 
Preconditions.checkNotNull(serializationExecutor);
 
         jobGraph = new JobGraph(jobID, streamGraph.getJobName());
     }
@@ -225,6 +256,21 @@ public class StreamingJobGraphGenerator {
 
         setVertexDescription();
 
+        // Wait for the serialization of operator coordinators and stream 
config.
+        try {
+            FutureUtils.combineAll(
+                            vertexConfigs.values().stream()
+                                    .map(
+                                            config ->
+                                                    
config.triggerSerializationAndReturnFuture(
+                                                            
serializationExecutor))
+                                    .collect(Collectors.toList()))
+                    .get();
+            FutureUtils.combineAll(coordinatorSerializationFutures).get();
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Error in serialization.", e);
+        }
+
         return jobGraph;
     }
 
@@ -747,15 +793,21 @@ public class StreamingJobGraphGenerator {
 
         for (OperatorCoordinator.Provider coordinatorProvider :
                 chainInfo.getCoordinatorProviders()) {
-            try {
-                jobVertex.addOperatorCoordinator(new 
SerializedValue<>(coordinatorProvider));
-            } catch (IOException e) {
-                throw new FlinkRuntimeException(
-                        String.format(
-                                "Coordinator Provider for node %s is not 
serializable.",
-                                chainedNames.get(streamNodeId)),
-                        e);
-            }
+            coordinatorSerializationFutures.add(
+                    CompletableFuture.runAsync(
+                            () -> {
+                                try {
+                                    jobVertex.addOperatorCoordinator(
+                                            new 
SerializedValue<>(coordinatorProvider));
+                                } catch (IOException e) {
+                                    throw new FlinkRuntimeException(
+                                            String.format(
+                                                    "Coordinator Provider for 
node %s is not serializable.",
+                                                    
chainedNames.get(streamNodeId)),
+                                            e);
+                                }
+                            },
+                            serializationExecutor));
         }
 
         jobVertex.setResources(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
index a40cca389fe..c986ee2216a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -237,6 +237,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends 
TestLogger {
 
         cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
         cfg.setOperatorID(new OperatorID());
+        cfg.serializeAllConfigs();
 
         try {
             MockStreamTask mockTask =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 11024a1f95a..2ec4989d1b2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -145,6 +145,7 @@ public class InterruptSensitiveRestoreTest {
             case KEYED_RAW:
                 cfg.setStateKeySerializer(IntSerializer.INSTANCE);
                 cfg.setStreamOperator(new StreamSource<>(new 
TestSource(mode)));
+                cfg.serializeAllConfigs();
                 break;
             default:
                 throw new IllegalArgumentException();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index e091890055f..bc7bb15ae21 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -1014,7 +1014,8 @@ public class OneInputStreamTaskTest extends TestLogger {
         }
 
         streamConfig.setChainedOutputs(outputEdges);
-        streamConfig.setTransitiveChainedTaskConfigs(chainedTaskConfigs);
+        chainedTaskConfigs.values().forEach(StreamConfig::serializeAllConfigs);
+        
streamConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedTaskConfigs);
     }
 
     private static class IdentityKeySelector<IN> implements KeySelector<IN, 
IN> {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
index 57fd2f0fbf1..3e7f5e64e37 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -94,6 +94,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends 
StreamTaskTestHarnes
         this.numInputChannelsPerGate = numInputChannelsPerGate;
 
         streamConfig.setStateKeySerializer(inputSerializer);
+        streamConfig.serializeAllConfigs();
     }
 
     /**
@@ -145,6 +146,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends 
StreamTaskTestHarnes
         ClosureCleaner.clean(keySelector, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
         streamConfig.setStatePartitioner(0, keySelector);
         
streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig));
+        streamConfig.serializeAllConfigs();
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
index a06839b30eb..55da7e7d8c7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java
@@ -83,7 +83,7 @@ public class OperatorChainTest {
             final StreamConfig cfg = new StreamConfig(new Configuration());
             cfg.setOperatorID(new OperatorID());
             cfg.setStateKeySerializer(new StringSerializer());
-
+            cfg.serializeAllConfigs();
             final List<StreamOperatorWrapper<?, ?>> operatorWrappers = new 
ArrayList<>();
 
             // initial output goes to nowhere
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
index f38952554b1..5efb7914744 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java
@@ -157,6 +157,7 @@ public class StreamConfigChainer<OWNER> {
                     ManagedMemoryUseCase.STATE_BACKEND, 1.0);
         }
         tailConfig.setChainIndex(chainIndex);
+        tailConfig.serializeAllConfigs();
 
         chainedConfigs.put(chainIndex, tailConfig);
 
@@ -191,8 +192,11 @@ public class StreamConfigChainer<OWNER> {
         tailConfig.setNumberOfOutputs(numberOfNonChainedOutputs);
         tailConfig.setOutEdgesInOrder(outEdgesInOrder);
         tailConfig.setNonChainedOutputs(outEdgesInOrder);
-        headConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
+        chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
+
+        headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs);
         headConfig.setOutEdgesInOrder(outEdgesInOrder);
+        headConfig.serializeAllConfigs();
 
         return owner;
     }
@@ -238,9 +242,11 @@ public class StreamConfigChainer<OWNER> {
         headConfig.setNumberOfOutputs(1);
         headConfig.setOutEdgesInOrder(outEdgesInOrder);
         headConfig.setNonChainedOutputs(outEdgesInOrder);
-        headConfig.setTransitiveChainedTaskConfigs(chainedConfigs);
+        chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs);
+        headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs);
         headConfig.setOutEdgesInOrder(outEdgesInOrder);
         headConfig.setTypeSerializerOut(outputSerializer);
+        headConfig.serializeAllConfigs();
 
         return owner;
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index 8ce2c827b62..299db5d10a3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -291,6 +291,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
         streamConfig.setInPhysicalEdges(inPhysicalEdges);
         streamConfig.setNumberOfNetworkInputs(inputGates.length);
         streamConfig.setInputs(inputs.toArray(new InputConfig[inputs.size()]));
+        streamConfig.serializeAllConfigs();
     }
 
     private void initializeNetworkInput(
@@ -363,9 +364,9 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
         sourceConfig.setTypeSerializerOut(sourceInput.getSourceSerializer());
         sourceConfig.setOperatorID(sourceInput.getOperatorId());
         
sourceConfig.setStreamOperatorFactory(sourceInput.getSourceOperatorFactory());
-
+        sourceConfig.serializeAllConfigs();
         transitiveChainedTaskConfigs.put(sourceToMainEdge.getSourceId(), 
sourceConfig);
-        
streamConfig.setTransitiveChainedTaskConfigs(transitiveChainedTaskConfigs);
+        
streamConfig.setAndSerializeTransitiveChainedTaskConfigs(transitiveChainedTaskConfigs);
         return new SourceInputConfig(sourceToMainEdge);
     }
 
@@ -420,6 +421,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
         checkState(!setupCalled, "This harness was already setup.");
         setupCalled = true;
         streamConfig.setStreamOperatorFactory(headOperatorFactory);
+        streamConfig.serializeAllConfigs();
 
         // There is always 1 default output other than the additional ones.
         return new StreamConfigChainer<>(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java
index b595648023b..346452d6506 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java
@@ -153,6 +153,7 @@ public class StreamTaskSystemExitTest extends TestLogger {
         streamConfig.setOperatorID(new OperatorID());
         streamConfig.setStreamOperator(operator);
         streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); 
// for source run
+        streamConfig.serializeAllConfigs();
 
         final JobInformation jobInformation =
                 new JobInformation(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 12676fb9cbc..6796d594bee 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -133,6 +133,7 @@ public class StreamTaskTerminationTest extends TestLogger {
         streamConfig.setStreamOperator(noOpStreamOperator);
         streamConfig.setOperatorID(new OperatorID());
         streamConfig.setStateBackend(blockingStateBackend);
+        streamConfig.serializeAllConfigs();
 
         final long checkpointId = 0L;
         final long checkpointTimestamp = 0L;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 7f86231f534..782344dfe20 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -438,6 +438,7 @@ public class StreamTaskTest extends TestLogger {
         cfg.setOperatorID(new OperatorID(4711L, 42L));
         cfg.setStreamOperator(new SlowlyDeserializingOperator());
         cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+        cfg.serializeAllConfigs();
 
         final TaskManagerActions taskManagerActions = spy(new 
NoOpTaskManagerActions());
         try (NettyShuffleEnvironment shuffleEnvironment =
@@ -1154,6 +1155,7 @@ public class StreamTaskTest extends TestLogger {
         StreamConfig streamConfig = new StreamConfig(taskConfiguration);
         streamConfig.setStreamOperator(new StreamMap<>(value -> value));
         streamConfig.setOperatorID(new OperatorID());
+        streamConfig.serializeAllConfigs();
         try (MockEnvironment mockEnvironment =
                 new 
MockEnvironmentBuilder().setTaskConfiguration(taskConfiguration).build()) {
 
@@ -1956,7 +1958,7 @@ public class StreamTaskTest extends TestLogger {
             Configuration taskManagerConfig,
             Executor executor)
             throws Exception {
-
+        taskConfig.serializeAllConfigs();
         return new TestTaskBuilder(shuffleEnvironment)
                 .setTaskManagerConfig(taskManagerConfig)
                 .setInvokable(invokable)
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 68094c43ba6..5a9930ef709 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -255,6 +255,7 @@ public class StreamTaskTestHarness<OUT> {
 
         streamConfig.setOutEdgesInOrder(outEdgesInOrder);
         streamConfig.setNonChainedOutputs(outEdgesInOrder);
+        streamConfig.serializeAllConfigs();
     }
 
     public StreamMockEnvironment createEnvironment() {
@@ -280,6 +281,7 @@ public class StreamTaskTestHarness<OUT> {
      * thread to finish running.
      */
     public Thread invoke() throws Exception {
+        streamConfig.serializeAllConfigs();
         return invoke(createEnvironment());
     }
 
@@ -295,6 +297,7 @@ public class StreamTaskTestHarness<OUT> {
 
         initializeInputs();
         initializeOutput();
+        streamConfig.serializeAllConfigs();
 
         taskThread = new TaskThread(() -> taskFactory.apply(mockEnv));
         taskThread.start();
@@ -508,6 +511,7 @@ public class StreamTaskTestHarness<OUT> {
         setupCalled = true;
         StreamConfig streamConfig = getStreamConfig();
         streamConfig.setStreamOperatorFactory(headOperatorFactory);
+        streamConfig.serializeAllConfigs();
         return new StreamConfigChainer(headOperatorId, streamConfig, this, 1);
     }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 07b80d14790..6a2a8efcbd8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -191,6 +191,7 @@ public class TaskCheckpointingBehaviourTest extends 
TestLogger {
         cfg.setStreamOperator(op);
         cfg.setOperatorID(new OperatorID());
         cfg.setStateBackend(backend);
+        cfg.serializeAllConfigs();
 
         ExecutionConfig executionConfig = new ExecutionConfig();
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
index dce7aee4b98..d45603e1ea6 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java
@@ -77,6 +77,7 @@ public class AbstractStreamOperatorTestHarnessTest extends 
TestLogger {
                 new AbstractStreamOperatorTestHarness<>(operator, 1, 1, 0)) {
 
             result.config.setStateKeySerializer(IntSerializer.INSTANCE);
+            result.config.serializeAllConfigs();
 
             Time timeToLive = Time.hours(1);
             result.initializeState(OperatorSubtaskState.builder().build());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java
index ab059b2d197..cd60ef12056 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java
@@ -49,6 +49,7 @@ public class KeyedBroadcastOperatorTestHarness<K, IN1, IN2, 
OUT>
         ClosureCleaner.clean(keySelector, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
         config.setStatePartitioner(0, keySelector);
         
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+        config.serializeAllConfigs();
     }
 
     public <KS, V> BroadcastState<KS, V> getBroadcastState(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedMultiInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedMultiInputStreamOperatorTestHarness.java
index 0c8cb44f9e9..704bd27fe7b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedMultiInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedMultiInputStreamOperatorTestHarness.java
@@ -39,6 +39,7 @@ public class KeyedMultiInputStreamOperatorTestHarness<KEY, 
OUT>
             StreamOperatorFactory<OUT> operator, TypeInformation<KEY> keyType) 
throws Exception {
         this(operator, 1, 1, 0);
         
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+        config.serializeAllConfigs();
     }
 
     public KeyedMultiInputStreamOperatorTestHarness(
@@ -53,5 +54,6 @@ public class KeyedMultiInputStreamOperatorTestHarness<KEY, 
OUT>
     public void setKeySelector(int idx, KeySelector<?, KEY> keySelector) {
         ClosureCleaner.clean(keySelector, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
         config.setStatePartitioner(idx, keySelector);
+        config.serializeAllConfigs();
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 5a0495244ee..0849fb418eb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -67,6 +67,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
         ClosureCleaner.clean(keySelector, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
         config.setStatePartitioner(0, keySelector);
         
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+        config.serializeAllConfigs();
     }
 
     public KeyedOneInputStreamOperatorTestHarness(
@@ -96,6 +97,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
         ClosureCleaner.clean(keySelector, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false);
         config.setStatePartitioner(0, keySelector);
         
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+        config.serializeAllConfigs();
     }
 
     public int numKeyedStateEntries() {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 1dcc1e9f4cb..6b93aca59bc 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -50,6 +50,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, 
IN2, OUT>
         config.setStatePartitioner(0, keySelector1);
         config.setStatePartitioner(1, keySelector2);
         
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+        config.serializeAllConfigs();
     }
 
     public KeyedTwoInputStreamOperatorTestHarness(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
index 23b74c93adc..11453a61106 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java
@@ -66,5 +66,6 @@ public class MockStreamConfig extends StreamConfig {
         }
         setOutEdgesInOrder(outEdgesInOrder);
         setNonChainedOutputs(outEdgesInOrder);
+        serializeAllConfigs();
     }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index a65667d602e..3b92cf75f59 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -60,6 +60,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
         this(operator, 1, 1, 0);
 
         
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
+        config.serializeAllConfigs();
     }
 
     public OneInputStreamOperatorTestHarness(
@@ -77,6 +78,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
                 subtaskIndex,
                 operatorID);
         
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
+        config.serializeAllConfigs();
     }
 
     public OneInputStreamOperatorTestHarness(
@@ -87,6 +89,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
         this(operator, environment);
 
         
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
+        config.serializeAllConfigs();
     }
 
     public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> 
operator)
@@ -141,6 +144,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
         this(factory, environment);
 
         
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
+        config.serializeAllConfigs();
     }
 
     public OneInputStreamOperatorTestHarness(
@@ -155,6 +159,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
         this(factory, 1, 1, 0);
 
         
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
+        config.serializeAllConfigs();
     }
 
     public OneInputStreamOperatorTestHarness(
@@ -185,6 +190,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
         super(operator, taskName, operatorID);
 
         
config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn));
+        config.serializeAllConfigs();
     }
 
     @Override
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
index ba28e514fef..9fdcf651d42 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java
@@ -294,6 +294,7 @@ public abstract class MultipleInputStreamOperatorBase 
extends AbstractStreamOper
                         .toArray(TypeSerializer[]::new));
         streamConfig.setTypeSerializerOut(
                 wrapper.getOutputType().createSerializer(executionConfig));
+        streamConfig.serializeAllConfigs();
         return streamConfig;
     }
 

Reply via email to