[3/5] flink git commit: [FLINK-3242] Also Set User-specified StateBackend without Checkpointing

2016-01-28 Thread sewen
[FLINK-3242] Also Set User-specified StateBackend without Checkpointing

Before, the user-specified StateBackedn would not be set when generating the
JobGraph if checkpointing was disabled.

This closes #1516


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b88c2c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b88c2c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b88c2c

Branch: refs/heads/master
Commit: 83b88c2c606f0d36bc04a7250629eb00516af919
Parents: f6d2ce9
Author: Aljoscha Krettek 
Authored: Mon Jan 18 11:53:31 2016 +0100
Committer: Stephan Ewen 
Committed: Thu Jan 28 14:30:28 2016 +0100

--
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../runtime/state/StateBackendITCase.java   | 134 +++
 2 files changed, 135 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 50c6a15..56b16a4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -317,10 +317,10 @@ public class StreamingJobGraphGenerator {
 
final CheckpointConfig ceckpointCfg = 
streamGraph.getCheckpointConfig();

+   config.setStateBackend(streamGraph.getStateBackend());

config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
if (ceckpointCfg.isCheckpointingEnabled()) {

config.setCheckpointMode(ceckpointCfg.getCheckpointingMode());
-   config.setStateBackend(streamGraph.getStateBackend());
}
else {
// the "at-least-once" input handler is slightly 
cheaper (in the absence of checkpoints),

http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
new file mode 100644
index 000..cdfef85
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.state;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
+
+   /**
+* Verify that the user-specified state backend is used even if 
checkpointing is disabled.
+*
+* @throws Exception
+*/
+   @Test
+   public void 

flink git commit: [FLINK-3242] Also Set User-specified StateBackend without Checkpointing

2016-01-28 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/release-0.10 dfeee2372 -> 2aeb6fac3


[FLINK-3242] Also Set User-specified StateBackend without Checkpointing

Before, the user-specified StateBackedn would not be set when generating the
JobGraph if checkpointing was disabled.

This closes #1516


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2aeb6fac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2aeb6fac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2aeb6fac

Branch: refs/heads/release-0.10
Commit: 2aeb6fac34f83b88ab888fa9d23ade784712e4b3
Parents: dfeee23
Author: Aljoscha Krettek 
Authored: Mon Jan 18 11:53:31 2016 +0100
Committer: Stephan Ewen 
Committed: Thu Jan 28 14:35:22 2016 +0100

--
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../runtime/state/StateBackendITCase.java   | 134 +++
 2 files changed, 135 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2aeb6fac/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index b5f3cf4..d060078 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -274,10 +274,10 @@ public class StreamingJobGraphGenerator {
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
 
+   config.setStateBackend(streamGraph.getStateBackend());

config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
if (streamGraph.isCheckpointingEnabled()) {

config.setCheckpointMode(streamGraph.getCheckpointingMode());
-   config.setStateBackend(streamGraph.getStateBackend());
} else {
// the at least once input handler is slightly cheaper 
(in the absence of checkpoints),
// so we use that one if checkpointing is not enabled

http://git-wip-us.apache.org/repos/asf/flink/blob/2aeb6fac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
new file mode 100644
index 000..cdfef85
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.state;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
+
+   /**
+* Verify