This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c32948  [FLINK-11653][DataStream] Add configuration to enforce custom 
UIDs on datastream
9c32948 is described below

commit 9c329484f9e8da58b3f0ac4e48348ba3e781dd08
Author: Seth Wiesman <sjwies...@gmail.com>
AuthorDate: Mon Mar 4 08:27:08 2019 -0600

    [FLINK-11653][DataStream] Add configuration to enforce custom UIDs on 
datastream
---
 docs/ops/upgrading.md                              |  2 +-
 .../apache/flink/api/common/ExecutionConfig.java   | 37 ++++++++++++++++++++++
 .../streaming/api/graph/StreamGraphGenerator.java  |  7 ++++
 .../StreamingJobGraphGeneratorNodeHashTest.java    | 34 ++++++++++++++++++++
 4 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/docs/ops/upgrading.md b/docs/ops/upgrading.md
index 22cea4c..42c1835 100644
--- a/docs/ops/upgrading.md
+++ b/docs/ops/upgrading.md
@@ -75,7 +75,7 @@ val mappedEvents: DataStream[(Int, Long)] = events
 
 **Important:** As of 1.3.x this also applies to operators that are part of a 
chain.
 
-By default all state stored in a savepoint must be matched to the operators of 
a starting application. However, users can explicitly agree to skip (and 
thereby discard) state that cannot be matched to an operator when starting a 
application from a savepoint. Stateful operators for which no state is found in 
the savepoint are initialized with their default state.
+By default all state stored in a savepoint must be matched to the operators of 
a starting application. However, users can explicitly agree to skip (and 
thereby discard) state that cannot be matched to an operator when starting a 
application from a savepoint. Stateful operators for which no state is found in 
the savepoint are initialized with their default state. Users may enforce best 
practices by calling `ExecutionConfig#disableAutoGeneratedUIDs` which will fail 
the job submission if an [...]
 
 ### Stateful Operators and User Functions
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java 
b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index d288810..0baf8fd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -116,6 +116,8 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        /** Flag to indicate whether generic types (through Kryo) are supported 
*/
        private boolean disableGenericTypes = false;
 
+       private boolean enableAutoGeneratedUids = true;
+
        private boolean objectReuse = false;
 
        private boolean autoTypeRegistrationEnabled = true;
@@ -613,6 +615,41 @@ public class ExecutionConfig implements Serializable, 
Archiveable<ArchivedExecut
        }
 
        /**
+        * Enables the Flink runtime to auto-generate UID's for operators.
+        *
+        * @see #disableAutoGeneratedUIDs()
+        */
+       public void enableAutoGeneratedUIDs() {
+               enableAutoGeneratedUids = true;
+       }
+
+       /**
+        * Disables auto-generated UIDs. Forces users to manually specify UIDs
+        * on DataStream applications.
+        *
+        * <p>It is highly recommended that users specify UIDs before deploying 
to
+        * production since they are used to match state in savepoints to 
operators
+        * in a job. Because auto-generated ID's are likely to change when 
modifying
+        * a job, specifying custom IDs allow an application to evolve overtime
+        * without discarding state.
+        */
+       public void  disableAutoGeneratedUIDs() {
+               enableAutoGeneratedUids = false;
+       }
+
+       /**
+        * Checks whether auto generated UIDs are supported.
+        *
+        * <p>Auto generated UIDs are enabled by default.
+        *
+        * @see #enableAutoGeneratedUIDs()
+        * @see #disableAutoGeneratedUIDs()
+        */
+       public boolean hasAutoGeneratedUIDsEnabled() {
+               return enableAutoGeneratedUids;
+       }
+
+       /**
         * Forces Flink to use the Apache Avro serializer for POJOs.
         *
         * <b>Important:</b> Make sure to include the <i>flink-avro</i> module.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index d2841fd..3fd43dc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -204,6 +204,13 @@ public class StreamGraphGenerator {
                        
streamGraph.setTransformationUserHash(transform.getId(), 
transform.getUserProvidedNodeHash());
                }
 
+               if 
(!streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()) {
+                       if (transform.getUserProvidedNodeHash() == null && 
transform.getUid() == null) {
+                               throw new IllegalStateException("Auto generated 
UIDs have been disabled " +
+                                       "but no UID or hash has been assigned 
to operator " + transform.getName());
+                       }
+               }
+
                if (transform.getMinResources() != null && 
transform.getPreferredResources() != null) {
                        streamGraph.setResources(transform.getId(), 
transform.getMinResources(), transform.getPreferredResources());
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 6dd08f6..3a44c17 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -456,6 +457,39 @@ public class StreamingJobGraphGeneratorNodeHashTest 
extends TestLogger {
                env.getStreamGraph().getJobGraph();
        }
 
+       @Test(expected = IllegalStateException.class)
+       public void testDisablingAutoUidsFailsStreamGraphCreation() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+               env.getConfig().disableAutoGeneratedUIDs();
+
+               env.addSource(new NoOpSourceFunction()).addSink(new 
DiscardingSink<>());
+               env.getStreamGraph();
+       }
+
+       @Test
+       public void testDisablingAutoUidsAcceptsManuallySetId() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+               env.getConfig().disableAutoGeneratedUIDs();
+
+               env
+                       .addSource(new NoOpSourceFunction()).uid("uid1")
+                       .addSink(new DiscardingSink<>()).uid("uid2");
+
+               env.getStreamGraph();
+       }
+
+       @Test
+       public void testDisablingAutoUidsAcceptsManuallySetHash() {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
+               env.getConfig().disableAutoGeneratedUIDs();
+
+               env
+                       .addSource(new 
NoOpSourceFunction()).setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
+                       .addSink(new 
DiscardingSink<>()).setUidHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb");
+
+               env.getStreamGraph();
+       }
+
        // 
------------------------------------------------------------------------
 
        /**

Reply via email to