zentol commented on a change in pull request #14767: URL: https://github.com/apache/flink/pull/14767#discussion_r565269327
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorBatchExecutionTest.java ########## @@ -105,6 +107,7 @@ public void testOneInputTransformation() { assertThat(graph.getStateBackend(), instanceOf(BatchExecutionStateBackend.class)); // the provider is passed as a lambda therefore we cannot assert the class of the provider assertThat(graph.getTimerServiceProvider(), notNullValue()); + assertThat(graph.getJobType(), is(JobType.BATCH)); Review comment: I'm not really sold on this shotgun-style assertion approach. I'd think that this would be fine for a separate test: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator<Integer> process = env.fromElements(1, 2).keyBy(Integer::intValue).process(DUMMY_PROCESS_FUNCTION); DataStreamSink<Integer> sink = process.addSink(new DiscardingSink<>()); StreamGraphGenerator graphGenerator = new StreamGraphGenerator( Collections.singletonList(sink.getTransformation()), env.getConfig(), env.getCheckpointConfig()); graphGenerator.setRuntimeExecutionMode(RuntimeExecutionMode.BATCH); StreamGraph graph = graphGenerator.generate(); assertThat(graph.getJobType(), is(JobType.BATCH)); ``` ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java ########## @@ -751,6 +760,8 @@ public void testPartitionTypesInBatchMode() { assertThat( verticesSorted.get(4) /* forward - sink */, hasOutputPartitionType(ResultPartitionType.PIPELINED_BOUNDED)); + + assertEquals(JobType.BATCH, jobGraph.getJobType()); Review comment: A separate test similar to the STREAMING case would be good. Is there maybe a test somewhere for `StreamGraphGenerator#shouldExecuteInBatchMode` that we could extend? Otherwise, I'd prefer a test like this: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.fromElements("test").addSink(new DiscardingSink<>()); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); assertEquals(JobType.BATCH, jobGraph.getJobType()); ``` ########## File path: flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java ########## @@ -94,6 +95,7 @@ private JobGraph createJobGraph(int numRecordsToSend) { StreamGraph streamGraph = env.getStreamGraph(); streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES); + streamGraph.setJobType(JobType.BATCH); Review comment: hmm...maybe we could save ourselves some trouble by implicitly setting the job type if certain exchange modes / schedule modes are set. In any case a comment would be good to clarify which of the 2 options (or both) results in the job being considered a batch one. ########## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java ########## @@ -70,6 +71,7 @@ public static void setBatchProperties(StreamGraph streamGraph, TableConfig table streamGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST); streamGraph.setStateBackend(null); streamGraph.setCheckpointStorage(null); + streamGraph.setJobType(JobType.BATCH); Review comment: let's move this right next to the schedule mode, and add a comment that this is dependent on the schedule mode. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org