[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user uce closed the pull request at: https://github.com/apache/flink/pull/754 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/754#issuecomment-141400026 I'm afraid that the requirements might change. Without the scheduling changes, this will be dead code. I would like to revisit this when there is someone to work on the scheduling side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/754#issuecomment-138890100 @uce, what's the status of this PR? Has @tillrohrmann's PR that you are referring to been merged? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/754#issuecomment-138910462 Why not add it now and have one more step done towards batch scheduling? Wouldn't it be duplicate work if this PR is put aside now and the internals change? But your the expert here. If you think its better to close the PR, go ahead ;-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/754#issuecomment-138907431 It's done on my side. The question is whether it makes sense to merge this w/o any plans to continue on the batch scheduling. My personal answer is no. I would close this and then re-open it when there is a need for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/754#issuecomment-114176855 Addressed @tillrohrmann's comments and rebased on master. @tillrohrmann, should we merge with your upcoming changes or can we merge it before? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/754#discussion_r31743017 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java --- @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.types.IntValue; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class BatchSchedulingTest { + + // Test cluster config + private final static int NUMBER_OF_TMS = 1; + private final static int NUMBER_OF_SLOTS_PER_TM = 2; + private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; + + private static TestingCluster flink; + + @BeforeClass + public static void setUpTestCluster() throws Exception { + flink = TestingUtils.startTestingCluster( + NUMBER_OF_SLOTS_PER_TM, + NUMBER_OF_TMS, + TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + } + + @AfterClass + public static void tearDownTestCluster() throws Exception { + if (flink != null) { + flink.stop(); + } + } + + @Test + public void testBatchSchedulingScheduleModeNotSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new JobGraph(testBatchSchedulingScheduleModeNotSet); + + AbstractJobVertex v1 = new AbstractJobVertex(v1); + v1.setInvokableClass(SourceTask.class); + v1.setParallelism(PARALLELISM); + + AbstractJobVertex v2 = new AbstractJobVertex(v2); + v2.setInvokableClass(UnionForwarder.class); + v2.setParallelism(PARALLELISM); + + v2.connectNewDataSetAsInput(v1, POINTWISE); + + jobGraph.addVertex(v1); + jobGraph.addVertex(v2); + + v1.setAsBatchSource(); + v1.addBatchSuccessors(v2); + + try { + // The execution should fail, because we configured a successor without + // setting the correct schedule mode. + flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); + fail(Did not throw expected Exception.); + } + catch (JobExecutionException expected) { + assertEquals(IllegalStateException.class, expected.getCause().getClass()); + } + } + + @Test(expected = JobExecutionException.class) + public void testBatchSchedulingNoSourceSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/754#discussion_r31728131 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -588,23 +590,74 @@ void markFinished() { if (current == RUNNING || current == DEPLOYING) { + ScheduleMode scheduleMode = getVertex().getExecutionGraph().getScheduleMode(); + + // Get the successors + final ListJobVertexID successors = getVertex().getJobVertex() + .getJobVertex().getBatchSuccessors(); --- End diff -- Maybe these functions names should then be renamed or the list of successors should be retrieved in two steps where one sees the different types. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/754#discussion_r31728385 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java --- @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.types.IntValue; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class BatchSchedulingTest { + + // Test cluster config + private final static int NUMBER_OF_TMS = 1; + private final static int NUMBER_OF_SLOTS_PER_TM = 2; + private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; + + private static TestingCluster flink; + + @BeforeClass + public static void setUpTestCluster() throws Exception { + flink = TestingUtils.startTestingCluster( + NUMBER_OF_SLOTS_PER_TM, + NUMBER_OF_TMS, + TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + } + + @AfterClass + public static void tearDownTestCluster() throws Exception { + if (flink != null) { + flink.stop(); + } + } + + @Test + public void testBatchSchedulingScheduleModeNotSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new JobGraph(testBatchSchedulingScheduleModeNotSet); + + AbstractJobVertex v1 = new AbstractJobVertex(v1); + v1.setInvokableClass(SourceTask.class); + v1.setParallelism(PARALLELISM); + + AbstractJobVertex v2 = new AbstractJobVertex(v2); + v2.setInvokableClass(UnionForwarder.class); + v2.setParallelism(PARALLELISM); + + v2.connectNewDataSetAsInput(v1, POINTWISE); + + jobGraph.addVertex(v1); + jobGraph.addVertex(v2); + + v1.setAsBatchSource(); + v1.addBatchSuccessors(v2); + + try { + // The execution should fail, because we configured a successor without + // setting the correct schedule mode. + flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); + fail(Did not throw expected Exception.); + } + catch (JobExecutionException expected) { + assertEquals(IllegalStateException.class, expected.getCause().getClass()); + } + } + + @Test(expected = JobExecutionException.class) + public void testBatchSchedulingNoSourceSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/754#discussion_r31729748 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java --- @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.types.IntValue; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class BatchSchedulingTest { + + // Test cluster config + private final static int NUMBER_OF_TMS = 1; + private final static int NUMBER_OF_SLOTS_PER_TM = 2; + private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; + + private static TestingCluster flink; + + @BeforeClass + public static void setUpTestCluster() throws Exception { + flink = TestingUtils.startTestingCluster( + NUMBER_OF_SLOTS_PER_TM, + NUMBER_OF_TMS, + TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + } + + @AfterClass + public static void tearDownTestCluster() throws Exception { + if (flink != null) { + flink.stop(); + } + } + + @Test + public void testBatchSchedulingScheduleModeNotSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new JobGraph(testBatchSchedulingScheduleModeNotSet); + + AbstractJobVertex v1 = new AbstractJobVertex(v1); + v1.setInvokableClass(SourceTask.class); + v1.setParallelism(PARALLELISM); + + AbstractJobVertex v2 = new AbstractJobVertex(v2); + v2.setInvokableClass(UnionForwarder.class); + v2.setParallelism(PARALLELISM); + + v2.connectNewDataSetAsInput(v1, POINTWISE); + + jobGraph.addVertex(v1); + jobGraph.addVertex(v2); + + v1.setAsBatchSource(); + v1.addBatchSuccessors(v2); + + try { + // The execution should fail, because we configured a successor without + // setting the correct schedule mode. + flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); + fail(Did not throw expected Exception.); + } + catch (JobExecutionException expected) { + assertEquals(IllegalStateException.class, expected.getCause().getClass()); + } + } + + @Test(expected = JobExecutionException.class) + public void testBatchSchedulingNoSourceSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/754#issuecomment-108843486 The hook that starts the next batch source is at the level of the `Execution`. That means that as soon as one execution is finished, the next leg would start. I think that would only work if the leg's does not by itself trigger any multiple successors (because it has an intermediate shuffle). Otherwise, we would again immediately exceed the number of slots. Can we put the hook onto the intermediate result, and have it trigger an entire slot sharing group as soon as it is fully available? My initial thought is that this would support the above case better. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user uce commented on the pull request: https://github.com/apache/flink/pull/754#issuecomment-108847772 I'm not sure if this is what you mean: the triggering is happening on the level of the intermediate result and not the execution. The code just happens to be in the execution. (This is what I meant when we had an offline chat about the way that intermediate results notifications are handled. It is somewhat of a mess.) To the second point. Yes, I think it is possible to trigger a SlotSharingGroup instead of a JobExecutionVertex. But would it make a difference for the result? The JobExecutionVertex would grab the slot and the successors would simply grap their subslots when they are in the same slot sharing group, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/754#discussion_r31716498 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java --- @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.types.IntValue; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class BatchSchedulingTest { + + // Test cluster config + private final static int NUMBER_OF_TMS = 1; + private final static int NUMBER_OF_SLOTS_PER_TM = 2; + private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; + + private static TestingCluster flink; + + @BeforeClass + public static void setUpTestCluster() throws Exception { + flink = TestingUtils.startTestingCluster( + NUMBER_OF_SLOTS_PER_TM, + NUMBER_OF_TMS, + TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + } + + @AfterClass + public static void tearDownTestCluster() throws Exception { + if (flink != null) { + flink.stop(); + } + } + + @Test + public void testBatchSchedulingScheduleModeNotSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new JobGraph(testBatchSchedulingScheduleModeNotSet); + + AbstractJobVertex v1 = new AbstractJobVertex(v1); + v1.setInvokableClass(SourceTask.class); + v1.setParallelism(PARALLELISM); + + AbstractJobVertex v2 = new AbstractJobVertex(v2); + v2.setInvokableClass(UnionForwarder.class); + v2.setParallelism(PARALLELISM); + + v2.connectNewDataSetAsInput(v1, POINTWISE); + + jobGraph.addVertex(v1); + jobGraph.addVertex(v2); + + v1.setAsBatchSource(); + v1.addBatchSuccessors(v2); + + try { + // The execution should fail, because we configured a successor without + // setting the correct schedule mode. + flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); + fail(Did not throw expected Exception.); + } + catch (JobExecutionException expected) { + assertEquals(IllegalStateException.class, expected.getCause().getClass()); + } + } + + @Test(expected = JobExecutionException.class) + public void testBatchSchedulingNoSourceSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
Github user uce commented on a diff in the pull request: https://github.com/apache/flink/pull/754#discussion_r31716687 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java --- @@ -0,0 +1,521 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmanager.scheduler; + +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.io.network.api.reader.RecordReader; +import org.apache.flink.runtime.io.network.api.writer.RecordWriter; +import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate; +import org.apache.flink.runtime.jobgraph.AbstractJobVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobmanager.Tasks; +import org.apache.flink.runtime.testingUtils.TestingCluster; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.types.IntValue; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.BitSet; + +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; +import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; +import static org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class BatchSchedulingTest { + + // Test cluster config + private final static int NUMBER_OF_TMS = 1; + private final static int NUMBER_OF_SLOTS_PER_TM = 2; + private final static int PARALLELISM = NUMBER_OF_TMS * NUMBER_OF_SLOTS_PER_TM; + + private static TestingCluster flink; + + @BeforeClass + public static void setUpTestCluster() throws Exception { + flink = TestingUtils.startTestingCluster( + NUMBER_OF_SLOTS_PER_TM, + NUMBER_OF_TMS, + TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT()); + } + + @AfterClass + public static void tearDownTestCluster() throws Exception { + if (flink != null) { + flink.stop(); + } + } + + @Test + public void testBatchSchedulingScheduleModeNotSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new JobGraph(testBatchSchedulingScheduleModeNotSet); + + AbstractJobVertex v1 = new AbstractJobVertex(v1); + v1.setInvokableClass(SourceTask.class); + v1.setParallelism(PARALLELISM); + + AbstractJobVertex v2 = new AbstractJobVertex(v2); + v2.setInvokableClass(UnionForwarder.class); + v2.setParallelism(PARALLELISM); + + v2.connectNewDataSetAsInput(v1, POINTWISE); + + jobGraph.addVertex(v1); + jobGraph.addVertex(v2); + + v1.setAsBatchSource(); + v1.addBatchSuccessors(v2); + + try { + // The execution should fail, because we configured a successor without + // setting the correct schedule mode. + flink.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION()); + fail(Did not throw expected Exception.); + } + catch (JobExecutionException expected) { + assertEquals(IllegalStateException.class, expected.getCause().getClass()); + } + } + + @Test(expected = JobExecutionException.class) + public void testBatchSchedulingNoSourceSet() throws Exception { + // Create the JobGraph + JobGraph jobGraph = new
[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...
GitHub user uce opened a pull request: https://github.com/apache/flink/pull/754 [FLINK-2119] Add ExecutionGraph support for batch scheduling This PR adds support for a newly introduced scheduling mode `BATCH_FROM_SOURCES`. The goal for me was to make this change *minimally invasive* in order to not touch too much core code shortly before the release. Essentially, this only touches two parts of the codebase: the scheduling action for blocking results and the job vertices. If you set the scheduling mode to `BATCH_FROM_SOURCES`, you can manually configure which input vertices are used as the sources when scheduling (`setAsBatchSource`). You can then manually specify the successor vertices (`addBatchSuccessor`), which are scheduled after the blocking results are finished. When there are no successors specified manually, the result consumers are scheduled as before. Mixing pipelined and blocking results leads to unspecified behaviour currently (aka it's not a good idea to do this at the moment). When you have something like this: ``` O sink | . - denotes a pipelined result O union +´|`+ | | | â â â --- denotes a blocking result O O O src0 src1 src2 ``` You can first first schedule `src0`, `src1`, `src2`, and then continue with the `union-sink` pipeline. ```java src[0].setAsBatchSource(); // src0 is the first to go... src[0].addBatchSuccessors(src[1]); // src0 = src1 src[1].addBatchSuccessors(src[2]); // src1 = src2 src[2].addBatchSuccessors(union); // src2 = [union = sink] ``` @StephanEwen or @tillrohrmann will work on the Optimizer/JobGraph counterpart of this and will build the `JobGraph` for programs in batch mode using the methods introduced in this PR. Do you guys think that this minimal support is sufficient for the first version? (Going over the result partition notification code, I really think it's pressing to refactor it. It is very very hard to understand. The corresponding issue [FLINK-1833](https://issues.apache.org/jira/browse/FLINK-1833) has been created a while back. I want to do this after the release.) You can merge this pull request into a Git repository by running: $ git pull https://github.com/uce/incubator-flink legs-2119 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/754.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #754 commit 4ac15982700257d3deb2d55a389afd0531f7f8be Author: Ufuk Celebi u...@apache.org Date: 2015-06-01T21:12:47Z [FLINK-2119] Add ExecutionGraph support for batch scheduling --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---