[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-09-18 Thread uce
Github user uce closed the pull request at:

https://github.com/apache/flink/pull/754


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-09-18 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/754#issuecomment-141400026
  
I'm afraid that the requirements might change. Without the scheduling 
changes, this will be dead code. I would like to revisit this when there is 
someone to work on the scheduling side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-09-09 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/754#issuecomment-138890100
  
@uce, what's the status of this PR? Has @tillrohrmann's PR that you are 
referring to been merged?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-09-09 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/754#issuecomment-138910462
  
Why not add it now and have one more step done towards batch scheduling?
Wouldn't it be duplicate work if this PR is put aside now and the internals 
change?

But your the expert here. If you think its better to close the PR, go ahead 
;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-09-09 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/754#issuecomment-138907431
  
It's done on my side. The question is whether it makes sense to merge this 
w/o any plans to continue on the batch scheduling. My personal answer is no. I 
would close this and then re-open it when there is a need for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-22 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/754#issuecomment-114176855
  
Addressed @tillrohrmann's comments and rebased on master. @tillrohrmann, 
should we merge with your upcoming changes or can we merge it before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/754#discussion_r31743017
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java
 ---
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.BitSet;
+
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class BatchSchedulingTest {
+
+   // Test cluster config
+   private final static int NUMBER_OF_TMS = 1;
+   private final static int NUMBER_OF_SLOTS_PER_TM = 2;
+   private final static int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUpTestCluster() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDownTestCluster() throws Exception {
+   if (flink != null) {
+   flink.stop();
+   }
+   }
+
+   @Test
+   public void testBatchSchedulingScheduleModeNotSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 
JobGraph(testBatchSchedulingScheduleModeNotSet);
+
+   AbstractJobVertex v1 = new AbstractJobVertex(v1);
+   v1.setInvokableClass(SourceTask.class);
+   v1.setParallelism(PARALLELISM);
+
+   AbstractJobVertex v2 = new AbstractJobVertex(v2);
+   v2.setInvokableClass(UnionForwarder.class);
+   v2.setParallelism(PARALLELISM);
+
+   v2.connectNewDataSetAsInput(v1, POINTWISE);
+
+   jobGraph.addVertex(v1);
+   jobGraph.addVertex(v2);
+
+   v1.setAsBatchSource();
+   v1.addBatchSuccessors(v2);
+
+   try {
+   // The execution should fail, because we configured a 
successor without
+   // setting the correct schedule mode.
+   flink.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
+   fail(Did not throw expected Exception.);
+   }
+   catch (JobExecutionException expected) {
+   assertEquals(IllegalStateException.class, 
expected.getCause().getClass());
+   }
+   }
+
+   @Test(expected = JobExecutionException.class)
+   public void testBatchSchedulingNoSourceSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 

[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/754#discussion_r31728131
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -588,23 +590,74 @@ void markFinished() {
 
if (current == RUNNING || current == DEPLOYING) {
 
+   ScheduleMode scheduleMode = 
getVertex().getExecutionGraph().getScheduleMode();
+
+   // Get the successors
+   final ListJobVertexID successors = 
getVertex().getJobVertex()
+   
.getJobVertex().getBatchSuccessors();
--- End diff --

Maybe these functions names should then be renamed or the list of 
successors should be retrieved in two steps where one sees the different types.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/754#discussion_r31728385
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java
 ---
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.BitSet;
+
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class BatchSchedulingTest {
+
+   // Test cluster config
+   private final static int NUMBER_OF_TMS = 1;
+   private final static int NUMBER_OF_SLOTS_PER_TM = 2;
+   private final static int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUpTestCluster() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDownTestCluster() throws Exception {
+   if (flink != null) {
+   flink.stop();
+   }
+   }
+
+   @Test
+   public void testBatchSchedulingScheduleModeNotSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 
JobGraph(testBatchSchedulingScheduleModeNotSet);
+
+   AbstractJobVertex v1 = new AbstractJobVertex(v1);
+   v1.setInvokableClass(SourceTask.class);
+   v1.setParallelism(PARALLELISM);
+
+   AbstractJobVertex v2 = new AbstractJobVertex(v2);
+   v2.setInvokableClass(UnionForwarder.class);
+   v2.setParallelism(PARALLELISM);
+
+   v2.connectNewDataSetAsInput(v1, POINTWISE);
+
+   jobGraph.addVertex(v1);
+   jobGraph.addVertex(v2);
+
+   v1.setAsBatchSource();
+   v1.addBatchSuccessors(v2);
+
+   try {
+   // The execution should fail, because we configured a 
successor without
+   // setting the correct schedule mode.
+   flink.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
+   fail(Did not throw expected Exception.);
+   }
+   catch (JobExecutionException expected) {
+   assertEquals(IllegalStateException.class, 
expected.getCause().getClass());
+   }
+   }
+
+   @Test(expected = JobExecutionException.class)
+   public void testBatchSchedulingNoSourceSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 

[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-04 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/754#discussion_r31729748
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java
 ---
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.BitSet;
+
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class BatchSchedulingTest {
+
+   // Test cluster config
+   private final static int NUMBER_OF_TMS = 1;
+   private final static int NUMBER_OF_SLOTS_PER_TM = 2;
+   private final static int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUpTestCluster() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDownTestCluster() throws Exception {
+   if (flink != null) {
+   flink.stop();
+   }
+   }
+
+   @Test
+   public void testBatchSchedulingScheduleModeNotSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 
JobGraph(testBatchSchedulingScheduleModeNotSet);
+
+   AbstractJobVertex v1 = new AbstractJobVertex(v1);
+   v1.setInvokableClass(SourceTask.class);
+   v1.setParallelism(PARALLELISM);
+
+   AbstractJobVertex v2 = new AbstractJobVertex(v2);
+   v2.setInvokableClass(UnionForwarder.class);
+   v2.setParallelism(PARALLELISM);
+
+   v2.connectNewDataSetAsInput(v1, POINTWISE);
+
+   jobGraph.addVertex(v1);
+   jobGraph.addVertex(v2);
+
+   v1.setAsBatchSource();
+   v1.addBatchSuccessors(v2);
+
+   try {
+   // The execution should fail, because we configured a 
successor without
+   // setting the correct schedule mode.
+   flink.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
+   fail(Did not throw expected Exception.);
+   }
+   catch (JobExecutionException expected) {
+   assertEquals(IllegalStateException.class, 
expected.getCause().getClass());
+   }
+   }
+
+   @Test(expected = JobExecutionException.class)
+   public void testBatchSchedulingNoSourceSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 

[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-04 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/754#issuecomment-108843486
  
The hook that starts the next batch source is at the level of the 
`Execution`. That means that as soon as one execution is finished, the next leg 
would start.

I think that would only work if the leg's does not by itself trigger any 
multiple successors (because it has an intermediate shuffle). Otherwise, we 
would again immediately exceed the number of slots.

Can we put the hook onto the intermediate result, and have it trigger an 
entire slot sharing group as soon as it is fully available? My initial 
thought is that this would support the above case better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-04 Thread uce
Github user uce commented on the pull request:

https://github.com/apache/flink/pull/754#issuecomment-108847772
  
I'm not sure if this is what you mean: the triggering is happening on the 
level of the intermediate result and not the execution. The code just happens 
to be in the execution. (This is what I meant when we had an offline chat about 
the way that intermediate results notifications are handled. It is somewhat of 
a mess.)

To the second point. Yes, I think it is possible to trigger a 
SlotSharingGroup instead of a JobExecutionVertex. But would it make a 
difference for the result? The JobExecutionVertex would grab the slot and the 
successors would simply grap their subslots when they are in the same slot 
sharing group, right?




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-04 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/754#discussion_r31716498
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java
 ---
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.BitSet;
+
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class BatchSchedulingTest {
+
+   // Test cluster config
+   private final static int NUMBER_OF_TMS = 1;
+   private final static int NUMBER_OF_SLOTS_PER_TM = 2;
+   private final static int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUpTestCluster() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDownTestCluster() throws Exception {
+   if (flink != null) {
+   flink.stop();
+   }
+   }
+
+   @Test
+   public void testBatchSchedulingScheduleModeNotSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 
JobGraph(testBatchSchedulingScheduleModeNotSet);
+
+   AbstractJobVertex v1 = new AbstractJobVertex(v1);
+   v1.setInvokableClass(SourceTask.class);
+   v1.setParallelism(PARALLELISM);
+
+   AbstractJobVertex v2 = new AbstractJobVertex(v2);
+   v2.setInvokableClass(UnionForwarder.class);
+   v2.setParallelism(PARALLELISM);
+
+   v2.connectNewDataSetAsInput(v1, POINTWISE);
+
+   jobGraph.addVertex(v1);
+   jobGraph.addVertex(v2);
+
+   v1.setAsBatchSource();
+   v1.addBatchSuccessors(v2);
+
+   try {
+   // The execution should fail, because we configured a 
successor without
+   // setting the correct schedule mode.
+   flink.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
+   fail(Did not throw expected Exception.);
+   }
+   catch (JobExecutionException expected) {
+   assertEquals(IllegalStateException.class, 
expected.getCause().getClass());
+   }
+   }
+
+   @Test(expected = JobExecutionException.class)
+   public void testBatchSchedulingNoSourceSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 

[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-04 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/754#discussion_r31716687
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/BatchSchedulingTest.java
 ---
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.types.IntValue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.BitSet;
+
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING;
+import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
+import static 
org.apache.flink.runtime.jobgraph.DistributionPattern.POINTWISE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class BatchSchedulingTest {
+
+   // Test cluster config
+   private final static int NUMBER_OF_TMS = 1;
+   private final static int NUMBER_OF_SLOTS_PER_TM = 2;
+   private final static int PARALLELISM = NUMBER_OF_TMS * 
NUMBER_OF_SLOTS_PER_TM;
+
+   private static TestingCluster flink;
+
+   @BeforeClass
+   public static void setUpTestCluster() throws Exception {
+   flink = TestingUtils.startTestingCluster(
+   NUMBER_OF_SLOTS_PER_TM,
+   NUMBER_OF_TMS,
+   TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+   }
+
+   @AfterClass
+   public static void tearDownTestCluster() throws Exception {
+   if (flink != null) {
+   flink.stop();
+   }
+   }
+
+   @Test
+   public void testBatchSchedulingScheduleModeNotSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 
JobGraph(testBatchSchedulingScheduleModeNotSet);
+
+   AbstractJobVertex v1 = new AbstractJobVertex(v1);
+   v1.setInvokableClass(SourceTask.class);
+   v1.setParallelism(PARALLELISM);
+
+   AbstractJobVertex v2 = new AbstractJobVertex(v2);
+   v2.setInvokableClass(UnionForwarder.class);
+   v2.setParallelism(PARALLELISM);
+
+   v2.connectNewDataSetAsInput(v1, POINTWISE);
+
+   jobGraph.addVertex(v1);
+   jobGraph.addVertex(v2);
+
+   v1.setAsBatchSource();
+   v1.addBatchSuccessors(v2);
+
+   try {
+   // The execution should fail, because we configured a 
successor without
+   // setting the correct schedule mode.
+   flink.submitJobAndWait(jobGraph, false, 
TestingUtils.TESTING_DURATION());
+   fail(Did not throw expected Exception.);
+   }
+   catch (JobExecutionException expected) {
+   assertEquals(IllegalStateException.class, 
expected.getCause().getClass());
+   }
+   }
+
+   @Test(expected = JobExecutionException.class)
+   public void testBatchSchedulingNoSourceSet() throws Exception {
+   // Create the JobGraph
+   JobGraph jobGraph = new 

[GitHub] flink pull request: [FLINK-2119] Add ExecutionGraph support for ba...

2015-06-01 Thread uce
GitHub user uce opened a pull request:

https://github.com/apache/flink/pull/754

[FLINK-2119] Add ExecutionGraph support for batch scheduling

This PR adds support for a newly introduced scheduling mode 
`BATCH_FROM_SOURCES`. The goal for me was to make this change *minimally 
invasive* in order to not touch too much core code shortly before the release.

Essentially, this only touches two parts of the codebase: the scheduling 
action for blocking results and the job vertices.

If you set the scheduling mode to `BATCH_FROM_SOURCES`, you can manually 
configure which input vertices are used as the sources when scheduling 
(`setAsBatchSource`). You can then manually specify the successor vertices 
(`addBatchSuccessor`), which are scheduled after the blocking results are 
finished. When there are no successors specified manually, the result consumers 
are scheduled as before. Mixing pipelined and blocking results leads to 
unspecified behaviour currently (aka it's not a good idea to do this at the 
moment).

When you have something like this:
```
O sink
|
. - denotes a pipelined result
O union
  +´|`+
  | | |
  ■ ■ ■ --- denotes a blocking result
  O O O
 src0  src1  src2
```
You can first first schedule `src0`, `src1`, `src2`, and then continue with 
the `union-sink` pipeline.

```java
src[0].setAsBatchSource(); // src0 is the first to go...

src[0].addBatchSuccessors(src[1]); // src0 = src1

src[1].addBatchSuccessors(src[2]); // src1 = src2

src[2].addBatchSuccessors(union); // src2 = [union = sink]
```

@StephanEwen or @tillrohrmann will work on the Optimizer/JobGraph 
counterpart of this and will build the `JobGraph` for programs in batch mode 
using the methods introduced in this PR. Do you guys think that this minimal 
support is sufficient for the first version?

(Going over the result partition notification code, I really think it's 
pressing to refactor it. It is very very hard to understand. The corresponding 
issue [FLINK-1833](https://issues.apache.org/jira/browse/FLINK-1833) has been 
created a while back. I want to do this after the release.)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uce/incubator-flink legs-2119

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/754.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #754


commit 4ac15982700257d3deb2d55a389afd0531f7f8be
Author: Ufuk Celebi u...@apache.org
Date:   2015-06-01T21:12:47Z

[FLINK-2119] Add ExecutionGraph support for batch scheduling




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---