This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8e0bb5e947a [FLINK-26675][runtime] Parallelized heavy serialization in StreamingJobGraphGenerator 8e0bb5e947a is described below commit 8e0bb5e947a6b4f09ca1df68c820b9f45425d411 Author: Yangze Guo <karma...@gmail.com> AuthorDate: Wed Mar 16 15:56:28 2022 +0800 [FLINK-26675][runtime] Parallelized heavy serialization in StreamingJobGraphGenerator This closes #19108. --- .../flink/state/api/BootstrapTransformation.java | 1 + .../state/api/StateBootstrapTransformation.java | 1 + .../flink/streaming/api/graph/StreamConfig.java | 174 +++++++++++---------- .../api/graph/StreamingJobGraphGenerator.java | 76 +++++++-- .../StreamSourceOperatorLatencyMetricsTest.java | 1 + .../tasks/InterruptSensitiveRestoreTest.java | 1 + .../runtime/tasks/OneInputStreamTaskTest.java | 3 +- .../tasks/OneInputStreamTaskTestHarness.java | 2 + .../streaming/runtime/tasks/OperatorChainTest.java | 2 +- .../runtime/tasks/StreamConfigChainer.java | 10 +- .../tasks/StreamTaskMailboxTestHarnessBuilder.java | 6 +- .../runtime/tasks/StreamTaskSystemExitTest.java | 1 + .../runtime/tasks/StreamTaskTerminationTest.java | 1 + .../streaming/runtime/tasks/StreamTaskTest.java | 4 +- .../runtime/tasks/StreamTaskTestHarness.java | 4 + .../tasks/TaskCheckpointingBehaviourTest.java | 1 + .../AbstractStreamOperatorTestHarnessTest.java | 1 + .../util/KeyedBroadcastOperatorTestHarness.java | 1 + .../KeyedMultiInputStreamOperatorTestHarness.java | 2 + .../KeyedOneInputStreamOperatorTestHarness.java | 2 + .../KeyedTwoInputStreamOperatorTestHarness.java | 1 + .../flink/streaming/util/MockStreamConfig.java | 1 + .../util/OneInputStreamOperatorTestHarness.java | 6 + .../MultipleInputStreamOperatorBase.java | 1 + 24 files changed, 202 insertions(+), 101 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java index 66f75f3a1c7..a58fb1e5174 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java @@ -235,6 +235,7 @@ public class BootstrapTransformation<T> { // This means leaving this stateBackend unwrapped. config.setChangelogStateBackendEnabled(TernaryBoolean.FALSE); config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0); + config.serializeAllConfigs(); return config; } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java index 615067c1241..64a898df1ce 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java @@ -223,6 +223,7 @@ public class StateBootstrapTransformation<T> { // This means leaving this stateBackend unwrapped. config.setChangelogStateBackendEnabled(TernaryBoolean.FALSE); config.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.STATE_BACKEND, 1.0); + config.serializeAllConfigs(); return config; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index b1bc3d633a0..8c85b4343d2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -44,6 +44,7 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TernaryBoolean; +import org.apache.flink.util.concurrent.FutureUtils; import java.io.IOException; import java.io.Serializable; @@ -54,6 +55,9 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Function; import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; @@ -132,6 +136,15 @@ public class StreamConfig implements Serializable { private final Configuration config; + // To make the parallelization of the StreamConfig serialization easier, we use this map + // to collect all the need-to-be-serialized objects. These objects will be serialized all at + // once then. + private final transient Map<String, Object> toBeSerializedConfigObjects = new HashMap<>(); + private final transient Map<Integer, CompletableFuture<StreamConfig>> chainedTaskFutures = + new HashMap<>(); + private final transient CompletableFuture<StreamConfig> serializationFuture = + new CompletableFuture<>(); + public StreamConfig(Configuration config) { this.config = config; } @@ -140,6 +153,67 @@ public class StreamConfig implements Serializable { return config; } + public CompletableFuture<StreamConfig> getSerializationFuture() { + return serializationFuture; + } + + /** Trigger the object config serialization and return the completable future. */ + public CompletableFuture<StreamConfig> triggerSerializationAndReturnFuture( + Executor ioExecutor) { + FutureUtils.combineAll(chainedTaskFutures.values()) + .thenAcceptAsync( + chainedConfigs -> { + // Serialize all the objects to config. + serializeAllConfigs(); + + try { + InstantiationUtil.writeObjectToConfig( + chainedConfigs.stream() + .collect( + Collectors.toMap( + StreamConfig::getVertexID, + Function.identity())), + this.config, + CHAINED_TASK_CONFIG); + } catch (IOException e) { + throw new StreamTaskException( + "Could not serialize object for key chained task config.", + e); + } + serializationFuture.complete(this); + }, + ioExecutor); + return serializationFuture; + } + + /** + * Serialize all object configs synchronously. Only used for operators which need to reconstruct + * the StreamConfig internally or test. + */ + public void serializeAllConfigs() { + toBeSerializedConfigObjects.forEach( + (key, object) -> { + try { + InstantiationUtil.writeObjectToConfig(object, this.config, key); + } catch (IOException e) { + throw new StreamTaskException( + String.format("Could not serialize object for key %s.", key), e); + } + }); + } + + @VisibleForTesting + public void setAndSerializeTransitiveChainedTaskConfigs( + Map<Integer, StreamConfig> chainedTaskConfigs) { + try { + InstantiationUtil.writeObjectToConfig( + chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG); + } catch (IOException e) { + throw new StreamTaskException( + "Could not serialize object for key chained task config.", e); + } + } + // ------------------------------------------------------------------------ // Configured Properties // ------------------------------------------------------------------------ @@ -232,11 +306,7 @@ public class StreamConfig implements Serializable { } private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) { - try { - InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key); - } catch (IOException e) { - throw new StreamTaskException("Could not serialize type serializer.", e); - } + toBeSerializedConfigObjects.put(key, typeWrapper); } public <T> TypeSerializer<T> getTypeSerializerSideOut(OutputTag<?> outputTag, ClassLoader cl) { @@ -258,11 +328,7 @@ public class StreamConfig implements Serializable { } public void setInputs(InputConfig... inputs) { - try { - InstantiationUtil.writeObjectToConfig(inputs, this.config, INPUTS); - } catch (IOException e) { - throw new StreamTaskException("Could not serialize inputs.", e); - } + toBeSerializedConfigObjects.put(INPUTS, inputs); } public InputConfig[] getInputs(ClassLoader cl) { @@ -304,12 +370,7 @@ public class StreamConfig implements Serializable { public void setStreamOperatorFactory(StreamOperatorFactory<?> factory) { if (factory != null) { - try { - InstantiationUtil.writeObjectToConfig(factory, this.config, SERIALIZEDUDF); - } catch (IOException e) { - throw new StreamTaskException( - "Cannot serialize operator object " + factory.getClass() + ".", e); - } + toBeSerializedConfigObjects.put(SERIALIZEDUDF, factory); } } @@ -374,11 +435,7 @@ public class StreamConfig implements Serializable { } public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) { - try { - InstantiationUtil.writeObjectToConfig(outputvertexIDs, this.config, NONCHAINED_OUTPUTS); - } catch (IOException e) { - throw new StreamTaskException("Cannot serialize non chained outputs.", e); - } + toBeSerializedConfigObjects.put(NONCHAINED_OUTPUTS, outputvertexIDs); } public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) { @@ -392,11 +449,7 @@ public class StreamConfig implements Serializable { } public void setChainedOutputs(List<StreamEdge> chainedOutputs) { - try { - InstantiationUtil.writeObjectToConfig(chainedOutputs, this.config, CHAINED_OUTPUTS); - } catch (IOException e) { - throw new StreamTaskException("Cannot serialize chained outputs.", e); - } + toBeSerializedConfigObjects.put(CHAINED_OUTPUTS, chainedOutputs); } public List<StreamEdge> getChainedOutputs(ClassLoader cl) { @@ -410,11 +463,7 @@ public class StreamConfig implements Serializable { } public void setInPhysicalEdges(List<StreamEdge> inEdges) { - try { - InstantiationUtil.writeObjectToConfig(inEdges, this.config, IN_STREAM_EDGES); - } catch (IOException e) { - throw new StreamTaskException("Cannot serialize inward edges.", e); - } + toBeSerializedConfigObjects.put(IN_STREAM_EDGES, inEdges); } public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) { @@ -472,11 +521,7 @@ public class StreamConfig implements Serializable { } public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) { - try { - InstantiationUtil.writeObjectToConfig(outEdgeList, this.config, EDGES_IN_ORDER); - } catch (IOException e) { - throw new StreamTaskException("Could not serialize outputs in order.", e); - } + toBeSerializedConfigObjects.put(EDGES_IN_ORDER, outEdgeList); } public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) { @@ -490,12 +535,9 @@ public class StreamConfig implements Serializable { } public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) { - - try { - InstantiationUtil.writeObjectToConfig( - chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG); - } catch (IOException e) { - throw new StreamTaskException("Could not serialize configuration.", e); + if (chainedTaskConfigs != null) { + chainedTaskConfigs.forEach( + (id, config) -> chainedTaskFutures.put(id, config.getSerializationFuture())); } } @@ -547,23 +589,13 @@ public class StreamConfig implements Serializable { public void setStateBackend(StateBackend backend) { if (backend != null) { - try { - InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND); - setStateBackendUsesManagedMemory(backend.useManagedMemory()); - } catch (Exception e) { - throw new StreamTaskException("Could not serialize stateHandle provider.", e); - } + toBeSerializedConfigObjects.put(STATE_BACKEND, backend); + setStateBackendUsesManagedMemory(backend.useManagedMemory()); } } public void setChangelogStateBackendEnabled(TernaryBoolean enabled) { - try { - InstantiationUtil.writeObjectToConfig( - enabled, this.config, ENABLE_CHANGE_LOG_STATE_BACKEND); - } catch (Exception e) { - throw new StreamTaskException( - "Could not serialize change log state backend enable flag.", e); - } + toBeSerializedConfigObjects.put(ENABLE_CHANGE_LOG_STATE_BACKEND, enabled); } @VisibleForTesting @@ -591,11 +623,7 @@ public class StreamConfig implements Serializable { public void setSavepointDir(Path directory) { if (directory != null) { - try { - InstantiationUtil.writeObjectToConfig(directory, config, SAVEPOINT_DIR); - } catch (Exception e) { - throw new StreamTaskException("Could not serialize savepoint directory.", e); - } + toBeSerializedConfigObjects.put(SAVEPOINT_DIR, directory); } } @@ -609,11 +637,7 @@ public class StreamConfig implements Serializable { public void setCheckpointStorage(CheckpointStorage storage) { if (storage != null) { - try { - InstantiationUtil.writeObjectToConfig(storage, config, CHECKPOINT_STORAGE); - } catch (Exception e) { - throw new StreamTaskException("Could not serialize checkpoint storage.", e); - } + toBeSerializedConfigObjects.put(CHECKPOINT_STORAGE, storage); } } @@ -627,12 +651,7 @@ public class StreamConfig implements Serializable { public void setTimerServiceProvider(InternalTimeServiceManager.Provider timerServiceProvider) { if (timerServiceProvider != null) { - try { - InstantiationUtil.writeObjectToConfig( - timerServiceProvider, this.config, TIMER_SERVICE_PROVIDER); - } catch (Exception e) { - throw new StreamTaskException("Could not serialize timer service provider.", e); - } + toBeSerializedConfigObjects.put(TIMER_SERVICE_PROVIDER, timerServiceProvider); } } @@ -645,12 +664,7 @@ public class StreamConfig implements Serializable { } public void setStatePartitioner(int input, KeySelector<?, ?> partitioner) { - try { - InstantiationUtil.writeObjectToConfig( - partitioner, this.config, STATE_PARTITIONER + input); - } catch (IOException e) { - throw new StreamTaskException("Could not serialize state partitioner.", e); - } + toBeSerializedConfigObjects.put(STATE_PARTITIONER + input, partitioner); } public <IN, K extends Serializable> KeySelector<IN, K> getStatePartitioner( @@ -664,11 +678,7 @@ public class StreamConfig implements Serializable { } public void setStateKeySerializer(TypeSerializer<?> serializer) { - try { - InstantiationUtil.writeObjectToConfig(serializer, this.config, STATE_KEY_SERIALIZER); - } catch (IOException e) { - throw new StreamTaskException("Could not serialize state key serializer.", e); - } + toBeSerializedConfigObjects.put(STATE_KEY_SERIALIZER, serializer); } public <K> TypeSerializer<K> getStateKeySerializer(ClassLoader cl) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 6c903274473..d83e5dfa098 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.util.Hardware; import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook; @@ -75,7 +76,10 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -97,6 +101,10 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -113,12 +121,28 @@ public class StreamingJobGraphGenerator { // ------------------------------------------------------------------------ + @VisibleForTesting public static JobGraph createJobGraph(StreamGraph streamGraph) { - return createJobGraph(streamGraph, null); + return new StreamingJobGraphGenerator(streamGraph, null, Runnable::run).createJobGraph(); } public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) { - return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); + // TODO Currently, we construct a new thread pool for the compilation of each job. In the + // future, we may refactor the job submission framework and make it reusable across jobs. + final ExecutorService serializationExecutor = + Executors.newFixedThreadPool( + Math.max( + 1, + Math.min( + Hardware.getNumberCPUCores(), + streamGraph.getExecutionConfig().getParallelism())), + new ExecutorThreadFactory("flink-operator-serialization-io")); + try { + return new StreamingJobGraphGenerator(streamGraph, jobID, serializationExecutor) + .createJobGraph(); + } finally { + serializationExecutor.shutdown(); + } } // ------------------------------------------------------------------------ @@ -146,7 +170,13 @@ public class StreamingJobGraphGenerator { private boolean hasHybridResultPartition = false; - private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) { + private final Executor serializationExecutor; + + // Futures for the serialization of operator coordinators + private final List<CompletableFuture<Void>> coordinatorSerializationFutures = new ArrayList<>(); + + private StreamingJobGraphGenerator( + StreamGraph streamGraph, @Nullable JobID jobID, Executor serializationExecutor) { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher()); @@ -160,6 +190,7 @@ public class StreamingJobGraphGenerator { this.chainedPreferredResources = new HashMap<>(); this.chainedInputOutputFormats = new HashMap<>(); this.physicalEdgesInOrder = new ArrayList<>(); + this.serializationExecutor = Preconditions.checkNotNull(serializationExecutor); jobGraph = new JobGraph(jobID, streamGraph.getJobName()); } @@ -225,6 +256,21 @@ public class StreamingJobGraphGenerator { setVertexDescription(); + // Wait for the serialization of operator coordinators and stream config. + try { + FutureUtils.combineAll( + vertexConfigs.values().stream() + .map( + config -> + config.triggerSerializationAndReturnFuture( + serializationExecutor)) + .collect(Collectors.toList())) + .get(); + FutureUtils.combineAll(coordinatorSerializationFutures).get(); + } catch (Exception e) { + throw new FlinkRuntimeException("Error in serialization.", e); + } + return jobGraph; } @@ -747,15 +793,21 @@ public class StreamingJobGraphGenerator { for (OperatorCoordinator.Provider coordinatorProvider : chainInfo.getCoordinatorProviders()) { - try { - jobVertex.addOperatorCoordinator(new SerializedValue<>(coordinatorProvider)); - } catch (IOException e) { - throw new FlinkRuntimeException( - String.format( - "Coordinator Provider for node %s is not serializable.", - chainedNames.get(streamNodeId)), - e); - } + coordinatorSerializationFutures.add( + CompletableFuture.runAsync( + () -> { + try { + jobVertex.addOperatorCoordinator( + new SerializedValue<>(coordinatorProvider)); + } catch (IOException e) { + throw new FlinkRuntimeException( + String.format( + "Coordinator Provider for node %s is not serializable.", + chainedNames.get(streamNodeId)), + e); + } + }, + serializationExecutor)); } jobVertex.setResources( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java index a40cca389fe..c986ee2216a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java @@ -237,6 +237,7 @@ public class StreamSourceOperatorLatencyMetricsTest extends TestLogger { cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); cfg.setOperatorID(new OperatorID()); + cfg.serializeAllConfigs(); try { MockStreamTask mockTask = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 11024a1f95a..2ec4989d1b2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -145,6 +145,7 @@ public class InterruptSensitiveRestoreTest { case KEYED_RAW: cfg.setStateKeySerializer(IntSerializer.INSTANCE); cfg.setStreamOperator(new StreamSource<>(new TestSource(mode))); + cfg.serializeAllConfigs(); break; default: throw new IllegalArgumentException(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java index e091890055f..bc7bb15ae21 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java @@ -1014,7 +1014,8 @@ public class OneInputStreamTaskTest extends TestLogger { } streamConfig.setChainedOutputs(outputEdges); - streamConfig.setTransitiveChainedTaskConfigs(chainedTaskConfigs); + chainedTaskConfigs.values().forEach(StreamConfig::serializeAllConfigs); + streamConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedTaskConfigs); } private static class IdentityKeySelector<IN> implements KeySelector<IN, IN> { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java index 57fd2f0fbf1..3e7f5e64e37 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java @@ -94,6 +94,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes this.numInputChannelsPerGate = numInputChannelsPerGate; streamConfig.setStateKeySerializer(inputSerializer); + streamConfig.serializeAllConfigs(); } /** @@ -145,6 +146,7 @@ public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarnes ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); streamConfig.setStatePartitioner(0, keySelector); streamConfig.setStateKeySerializer(keyType.createSerializer(executionConfig)); + streamConfig.serializeAllConfigs(); } @Override diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java index a06839b30eb..55da7e7d8c7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OperatorChainTest.java @@ -83,7 +83,7 @@ public class OperatorChainTest { final StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setOperatorID(new OperatorID()); cfg.setStateKeySerializer(new StringSerializer()); - + cfg.serializeAllConfigs(); final List<StreamOperatorWrapper<?, ?>> operatorWrappers = new ArrayList<>(); // initial output goes to nowhere diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java index f38952554b1..5efb7914744 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamConfigChainer.java @@ -157,6 +157,7 @@ public class StreamConfigChainer<OWNER> { ManagedMemoryUseCase.STATE_BACKEND, 1.0); } tailConfig.setChainIndex(chainIndex); + tailConfig.serializeAllConfigs(); chainedConfigs.put(chainIndex, tailConfig); @@ -191,8 +192,11 @@ public class StreamConfigChainer<OWNER> { tailConfig.setNumberOfOutputs(numberOfNonChainedOutputs); tailConfig.setOutEdgesInOrder(outEdgesInOrder); tailConfig.setNonChainedOutputs(outEdgesInOrder); - headConfig.setTransitiveChainedTaskConfigs(chainedConfigs); + chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs); + + headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs); headConfig.setOutEdgesInOrder(outEdgesInOrder); + headConfig.serializeAllConfigs(); return owner; } @@ -238,9 +242,11 @@ public class StreamConfigChainer<OWNER> { headConfig.setNumberOfOutputs(1); headConfig.setOutEdgesInOrder(outEdgesInOrder); headConfig.setNonChainedOutputs(outEdgesInOrder); - headConfig.setTransitiveChainedTaskConfigs(chainedConfigs); + chainedConfigs.values().forEach(StreamConfig::serializeAllConfigs); + headConfig.setAndSerializeTransitiveChainedTaskConfigs(chainedConfigs); headConfig.setOutEdgesInOrder(outEdgesInOrder); headConfig.setTypeSerializerOut(outputSerializer); + headConfig.serializeAllConfigs(); return owner; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java index 8ce2c827b62..299db5d10a3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java @@ -291,6 +291,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> { streamConfig.setInPhysicalEdges(inPhysicalEdges); streamConfig.setNumberOfNetworkInputs(inputGates.length); streamConfig.setInputs(inputs.toArray(new InputConfig[inputs.size()])); + streamConfig.serializeAllConfigs(); } private void initializeNetworkInput( @@ -363,9 +364,9 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> { sourceConfig.setTypeSerializerOut(sourceInput.getSourceSerializer()); sourceConfig.setOperatorID(sourceInput.getOperatorId()); sourceConfig.setStreamOperatorFactory(sourceInput.getSourceOperatorFactory()); - + sourceConfig.serializeAllConfigs(); transitiveChainedTaskConfigs.put(sourceToMainEdge.getSourceId(), sourceConfig); - streamConfig.setTransitiveChainedTaskConfigs(transitiveChainedTaskConfigs); + streamConfig.setAndSerializeTransitiveChainedTaskConfigs(transitiveChainedTaskConfigs); return new SourceInputConfig(sourceToMainEdge); } @@ -420,6 +421,7 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> { checkState(!setupCalled, "This harness was already setup."); setupCalled = true; streamConfig.setStreamOperatorFactory(headOperatorFactory); + streamConfig.serializeAllConfigs(); // There is always 1 default output other than the additional ones. return new StreamConfigChainer<>( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java index b595648023b..346452d6506 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java @@ -153,6 +153,7 @@ public class StreamTaskSystemExitTest extends TestLogger { streamConfig.setOperatorID(new OperatorID()); streamConfig.setStreamOperator(operator); streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); // for source run + streamConfig.serializeAllConfigs(); final JobInformation jobInformation = new JobInformation( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java index 12676fb9cbc..6796d594bee 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -133,6 +133,7 @@ public class StreamTaskTerminationTest extends TestLogger { streamConfig.setStreamOperator(noOpStreamOperator); streamConfig.setOperatorID(new OperatorID()); streamConfig.setStateBackend(blockingStateBackend); + streamConfig.serializeAllConfigs(); final long checkpointId = 0L; final long checkpointTimestamp = 0L; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 7f86231f534..782344dfe20 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -438,6 +438,7 @@ public class StreamTaskTest extends TestLogger { cfg.setOperatorID(new OperatorID(4711L, 42L)); cfg.setStreamOperator(new SlowlyDeserializingOperator()); cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); + cfg.serializeAllConfigs(); final TaskManagerActions taskManagerActions = spy(new NoOpTaskManagerActions()); try (NettyShuffleEnvironment shuffleEnvironment = @@ -1154,6 +1155,7 @@ public class StreamTaskTest extends TestLogger { StreamConfig streamConfig = new StreamConfig(taskConfiguration); streamConfig.setStreamOperator(new StreamMap<>(value -> value)); streamConfig.setOperatorID(new OperatorID()); + streamConfig.serializeAllConfigs(); try (MockEnvironment mockEnvironment = new MockEnvironmentBuilder().setTaskConfiguration(taskConfiguration).build()) { @@ -1956,7 +1958,7 @@ public class StreamTaskTest extends TestLogger { Configuration taskManagerConfig, Executor executor) throws Exception { - + taskConfig.serializeAllConfigs(); return new TestTaskBuilder(shuffleEnvironment) .setTaskManagerConfig(taskManagerConfig) .setInvokable(invokable) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index 68094c43ba6..5a9930ef709 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -255,6 +255,7 @@ public class StreamTaskTestHarness<OUT> { streamConfig.setOutEdgesInOrder(outEdgesInOrder); streamConfig.setNonChainedOutputs(outEdgesInOrder); + streamConfig.serializeAllConfigs(); } public StreamMockEnvironment createEnvironment() { @@ -280,6 +281,7 @@ public class StreamTaskTestHarness<OUT> { * thread to finish running. */ public Thread invoke() throws Exception { + streamConfig.serializeAllConfigs(); return invoke(createEnvironment()); } @@ -295,6 +297,7 @@ public class StreamTaskTestHarness<OUT> { initializeInputs(); initializeOutput(); + streamConfig.serializeAllConfigs(); taskThread = new TaskThread(() -> taskFactory.apply(mockEnv)); taskThread.start(); @@ -508,6 +511,7 @@ public class StreamTaskTestHarness<OUT> { setupCalled = true; StreamConfig streamConfig = getStreamConfig(); streamConfig.setStreamOperatorFactory(headOperatorFactory); + streamConfig.serializeAllConfigs(); return new StreamConfigChainer(headOperatorId, streamConfig, this, 1); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java index 07b80d14790..6a2a8efcbd8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java @@ -191,6 +191,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger { cfg.setStreamOperator(op); cfg.setOperatorID(new OperatorID()); cfg.setStateBackend(backend); + cfg.serializeAllConfigs(); ExecutionConfig executionConfig = new ExecutionConfig(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java index dce7aee4b98..d45603e1ea6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.java @@ -77,6 +77,7 @@ public class AbstractStreamOperatorTestHarnessTest extends TestLogger { new AbstractStreamOperatorTestHarness<>(operator, 1, 1, 0)) { result.config.setStateKeySerializer(IntSerializer.INSTANCE); + result.config.serializeAllConfigs(); Time timeToLive = Time.hours(1); result.initializeState(OperatorSubtaskState.builder().build()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java index ab059b2d197..cd60ef12056 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedBroadcastOperatorTestHarness.java @@ -49,6 +49,7 @@ public class KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); config.setStatePartitioner(0, keySelector); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + config.serializeAllConfigs(); } public <KS, V> BroadcastState<KS, V> getBroadcastState( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedMultiInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedMultiInputStreamOperatorTestHarness.java index 0c8cb44f9e9..704bd27fe7b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedMultiInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedMultiInputStreamOperatorTestHarness.java @@ -39,6 +39,7 @@ public class KeyedMultiInputStreamOperatorTestHarness<KEY, OUT> StreamOperatorFactory<OUT> operator, TypeInformation<KEY> keyType) throws Exception { this(operator, 1, 1, 0); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + config.serializeAllConfigs(); } public KeyedMultiInputStreamOperatorTestHarness( @@ -53,5 +54,6 @@ public class KeyedMultiInputStreamOperatorTestHarness<KEY, OUT> public void setKeySelector(int idx, KeySelector<?, KEY> keySelector) { ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); config.setStatePartitioner(idx, keySelector); + config.serializeAllConfigs(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 5a0495244ee..0849fb418eb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -67,6 +67,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); config.setStatePartitioner(0, keySelector); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + config.serializeAllConfigs(); } public KeyedOneInputStreamOperatorTestHarness( @@ -96,6 +97,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> ClosureCleaner.clean(keySelector, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, false); config.setStatePartitioner(0, keySelector); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + config.serializeAllConfigs(); } public int numKeyedStateEntries() { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index 1dcc1e9f4cb..6b93aca59bc 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -50,6 +50,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> config.setStatePartitioner(0, keySelector1); config.setStatePartitioner(1, keySelector2); config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + config.serializeAllConfigs(); } public KeyedTwoInputStreamOperatorTestHarness( diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java index 23b74c93adc..11453a61106 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockStreamConfig.java @@ -66,5 +66,6 @@ public class MockStreamConfig extends StreamConfig { } setOutEdgesInOrder(outEdgesInOrder); setNonChainedOutputs(outEdgesInOrder); + serializeAllConfigs(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index a65667d602e..3b92cf75f59 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -60,6 +60,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> this(operator, 1, 1, 0); config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); + config.serializeAllConfigs(); } public OneInputStreamOperatorTestHarness( @@ -77,6 +78,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> subtaskIndex, operatorID); config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); + config.serializeAllConfigs(); } public OneInputStreamOperatorTestHarness( @@ -87,6 +89,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> this(operator, environment); config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); + config.serializeAllConfigs(); } public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) @@ -141,6 +144,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> this(factory, environment); config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); + config.serializeAllConfigs(); } public OneInputStreamOperatorTestHarness( @@ -155,6 +159,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> this(factory, 1, 1, 0); config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); + config.serializeAllConfigs(); } public OneInputStreamOperatorTestHarness( @@ -185,6 +190,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> super(operator, taskName, operatorID); config.setupNetworkInputs(Preconditions.checkNotNull(typeSerializerIn)); + config.serializeAllConfigs(); } @Override diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java index ba28e514fef..9fdcf651d42 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/multipleinput/MultipleInputStreamOperatorBase.java @@ -294,6 +294,7 @@ public abstract class MultipleInputStreamOperatorBase extends AbstractStreamOper .toArray(TypeSerializer[]::new)); streamConfig.setTypeSerializerOut( wrapper.getOutputType().createSerializer(executionConfig)); + streamConfig.serializeAllConfigs(); return streamConfig; }