This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9c32948 [FLINK-11653][DataStream] Add configuration to enforce custom UIDs on datastream 9c32948 is described below commit 9c329484f9e8da58b3f0ac4e48348ba3e781dd08 Author: Seth Wiesman <sjwies...@gmail.com> AuthorDate: Mon Mar 4 08:27:08 2019 -0600 [FLINK-11653][DataStream] Add configuration to enforce custom UIDs on datastream --- docs/ops/upgrading.md | 2 +- .../apache/flink/api/common/ExecutionConfig.java | 37 ++++++++++++++++++++++ .../streaming/api/graph/StreamGraphGenerator.java | 7 ++++ .../StreamingJobGraphGeneratorNodeHashTest.java | 34 ++++++++++++++++++++ 4 files changed, 79 insertions(+), 1 deletion(-) diff --git a/docs/ops/upgrading.md b/docs/ops/upgrading.md index 22cea4c..42c1835 100644 --- a/docs/ops/upgrading.md +++ b/docs/ops/upgrading.md @@ -75,7 +75,7 @@ val mappedEvents: DataStream[(Int, Long)] = events **Important:** As of 1.3.x this also applies to operators that are part of a chain. -By default all state stored in a savepoint must be matched to the operators of a starting application. However, users can explicitly agree to skip (and thereby discard) state that cannot be matched to an operator when starting a application from a savepoint. Stateful operators for which no state is found in the savepoint are initialized with their default state. +By default all state stored in a savepoint must be matched to the operators of a starting application. However, users can explicitly agree to skip (and thereby discard) state that cannot be matched to an operator when starting a application from a savepoint. Stateful operators for which no state is found in the savepoint are initialized with their default state. Users may enforce best practices by calling `ExecutionConfig#disableAutoGeneratedUIDs` which will fail the job submission if an [...] ### Stateful Operators and User Functions diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index d288810..0baf8fd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -116,6 +116,8 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut /** Flag to indicate whether generic types (through Kryo) are supported */ private boolean disableGenericTypes = false; + private boolean enableAutoGeneratedUids = true; + private boolean objectReuse = false; private boolean autoTypeRegistrationEnabled = true; @@ -613,6 +615,41 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut } /** + * Enables the Flink runtime to auto-generate UID's for operators. + * + * @see #disableAutoGeneratedUIDs() + */ + public void enableAutoGeneratedUIDs() { + enableAutoGeneratedUids = true; + } + + /** + * Disables auto-generated UIDs. Forces users to manually specify UIDs + * on DataStream applications. + * + * <p>It is highly recommended that users specify UIDs before deploying to + * production since they are used to match state in savepoints to operators + * in a job. Because auto-generated ID's are likely to change when modifying + * a job, specifying custom IDs allow an application to evolve overtime + * without discarding state. + */ + public void disableAutoGeneratedUIDs() { + enableAutoGeneratedUids = false; + } + + /** + * Checks whether auto generated UIDs are supported. + * + * <p>Auto generated UIDs are enabled by default. + * + * @see #enableAutoGeneratedUIDs() + * @see #disableAutoGeneratedUIDs() + */ + public boolean hasAutoGeneratedUIDsEnabled() { + return enableAutoGeneratedUids; + } + + /** * Forces Flink to use the Apache Avro serializer for POJOs. * * <b>Important:</b> Make sure to include the <i>flink-avro</i> module. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index d2841fd..3fd43dc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -204,6 +204,13 @@ public class StreamGraphGenerator { streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash()); } + if (!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) { + if (transform.getUserProvidedNodeHash() == null && transform.getUid() == null) { + throw new IllegalStateException("Auto generated UIDs have been disabled " + + "but no UID or hash has been assigned to operator " + transform.getName()); + } + } + if (transform.getMinResources() != null && transform.getPreferredResources() != null) { streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources()); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java index 6dd08f6..3a44c17 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -456,6 +457,39 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger { env.getStreamGraph().getJobGraph(); } + @Test(expected = IllegalStateException.class) + public void testDisablingAutoUidsFailsStreamGraphCreation() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.getConfig().disableAutoGeneratedUIDs(); + + env.addSource(new NoOpSourceFunction()).addSink(new DiscardingSink<>()); + env.getStreamGraph(); + } + + @Test + public void testDisablingAutoUidsAcceptsManuallySetId() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.getConfig().disableAutoGeneratedUIDs(); + + env + .addSource(new NoOpSourceFunction()).uid("uid1") + .addSink(new DiscardingSink<>()).uid("uid2"); + + env.getStreamGraph(); + } + + @Test + public void testDisablingAutoUidsAcceptsManuallySetHash() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); + env.getConfig().disableAutoGeneratedUIDs(); + + env + .addSource(new NoOpSourceFunction()).setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + .addSink(new DiscardingSink<>()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"); + + env.getStreamGraph(); + } + // ------------------------------------------------------------------------ /**