kl0u commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r506181718



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -239,13 +261,28 @@ private void configureStreamGraph(final StreamGraph 
graph) {
                        
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
                        
graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
                        setDefaultBufferTimeout(-1);
+                       setBatchStateBackendAndTimerService(graph);
                } else {
                        
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
                        
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
                        graph.setScheduleMode(ScheduleMode.EAGER);
                }
        }
 
+       private void setBatchStateBackendAndTimerService(StreamGraph graph) {
+               boolean useStateBackend = 
configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
+               boolean sortInputs = 
configuration.get(ExecutionOptions.SORT_INPUTS);
+               checkState(
+                       !useStateBackend || sortInputs,
+                       "Batch state backend requires the sorted inputs to be 
enabled!");
+
+               if (useStateBackend) {
+                       LOG.debug("Using BATCH execution state backend.");

Review comment:
       Nit: Can't we move 
https://github.com/apache/flink/pull/13647/files#diff-54c8fe1971ffb5aa55b3f829f43aa02c7765b62c397f0c943b4049a4fd1e3a62R253
 to the `else{}` block in lines 266...? I find it more clear than write and 
then overwrite.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
##########
@@ -239,13 +261,28 @@ private void configureStreamGraph(final StreamGraph 
graph) {
                        
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
                        
graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
                        setDefaultBufferTimeout(-1);
+                       setBatchStateBackendAndTimerService(graph);
                } else {
                        
graph.setAllVerticesInSameSlotSharingGroupByDefault(true);
                        
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
                        graph.setScheduleMode(ScheduleMode.EAGER);
                }
        }
 
+       private void setBatchStateBackendAndTimerService(StreamGraph graph) {
+               boolean useStateBackend = 
configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
+               boolean sortInputs = 
configuration.get(ExecutionOptions.SORT_INPUTS);
+               checkState(
+                       !useStateBackend || sortInputs,
+                       "Batch state backend requires the sorted inputs to be 
enabled!");
+
+               if (useStateBackend) {
+                       LOG.debug("Using BATCH execution state backend.");

Review comment:
       "Using BATCH execution state backend _and timer service_."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to