[3/5] flink git commit: [FLINK-3242] Also Set User-specified StateBackend without Checkpointing
[FLINK-3242] Also Set User-specified StateBackend without Checkpointing Before, the user-specified StateBackedn would not be set when generating the JobGraph if checkpointing was disabled. This closes #1516 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/83b88c2c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/83b88c2c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/83b88c2c Branch: refs/heads/master Commit: 83b88c2c606f0d36bc04a7250629eb00516af919 Parents: f6d2ce9 Author: Aljoscha Krettek Authored: Mon Jan 18 11:53:31 2016 +0100 Committer: Stephan Ewen Committed: Thu Jan 28 14:30:28 2016 +0100 -- .../api/graph/StreamingJobGraphGenerator.java | 2 +- .../runtime/state/StateBackendITCase.java | 134 +++ 2 files changed, 135 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 50c6a15..56b16a4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -317,10 +317,10 @@ public class StreamingJobGraphGenerator { final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig(); + config.setStateBackend(streamGraph.getStateBackend()); config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled()); if (ceckpointCfg.isCheckpointingEnabled()) { config.setCheckpointMode(ceckpointCfg.getCheckpointingMode()); - config.setStateBackend(streamGraph.getStateBackend()); } else { // the "at-least-once" input handler is slightly cheaper (in the absence of checkpoints), http://git-wip-us.apache.org/repos/asf/flink/blob/83b88c2c/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java new file mode 100644 index 000..cdfef85 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.state; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertTrue; + + +public class StateBackendITCase extends StreamingMultipleProgramsTestBase { + + /** +* Verify that the user-specified state backend is used even if checkpointing is disabled. +* +* @throws Exception +*/ + @Test + public void
flink git commit: [FLINK-3242] Also Set User-specified StateBackend without Checkpointing
Repository: flink Updated Branches: refs/heads/release-0.10 dfeee2372 -> 2aeb6fac3 [FLINK-3242] Also Set User-specified StateBackend without Checkpointing Before, the user-specified StateBackedn would not be set when generating the JobGraph if checkpointing was disabled. This closes #1516 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2aeb6fac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2aeb6fac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2aeb6fac Branch: refs/heads/release-0.10 Commit: 2aeb6fac34f83b88ab888fa9d23ade784712e4b3 Parents: dfeee23 Author: Aljoscha Krettek Authored: Mon Jan 18 11:53:31 2016 +0100 Committer: Stephan Ewen Committed: Thu Jan 28 14:35:22 2016 +0100 -- .../api/graph/StreamingJobGraphGenerator.java | 2 +- .../runtime/state/StateBackendITCase.java | 134 +++ 2 files changed, 135 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2aeb6fac/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index b5f3cf4..d060078 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -274,10 +274,10 @@ public class StreamingJobGraphGenerator { config.setNonChainedOutputs(nonChainableOutputs); config.setChainedOutputs(chainableOutputs); + config.setStateBackend(streamGraph.getStateBackend()); config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled()); if (streamGraph.isCheckpointingEnabled()) { config.setCheckpointMode(streamGraph.getCheckpointingMode()); - config.setStateBackend(streamGraph.getStateBackend()); } else { // the at least once input handler is slightly cheaper (in the absence of checkpoints), // so we use that one if checkpointing is not enabled http://git-wip-us.apache.org/repos/asf/flink/blob/2aeb6fac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java new file mode 100644 index 000..cdfef85 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.state; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.KvState; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.junit.Test; + +import java.io.Serializable; + +import static org.junit.Assert.assertTrue; + + +public class StateBackendITCase extends StreamingMultipleProgramsTestBase { + + /** +* Verify