Repository: flink
Updated Branches:
  refs/heads/release-0.10 dfeee2372 -> 2aeb6fac3


[FLINK-3242] Also Set User-specified StateBackend without Checkpointing

Before, the user-specified StateBackedn would not be set when generating the
JobGraph if checkpointing was disabled.

This closes #1516


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2aeb6fac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2aeb6fac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2aeb6fac

Branch: refs/heads/release-0.10
Commit: 2aeb6fac34f83b88ab888fa9d23ade784712e4b3
Parents: dfeee23
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Jan 18 11:53:31 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Jan 28 14:35:22 2016 +0100

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../runtime/state/StateBackendITCase.java       | 134 +++++++++++++++++++
 2 files changed, 135 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2aeb6fac/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index b5f3cf4..d060078 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -274,10 +274,10 @@ public class StreamingJobGraphGenerator {
                config.setNonChainedOutputs(nonChainableOutputs);
                config.setChainedOutputs(chainableOutputs);
 
+               config.setStateBackend(streamGraph.getStateBackend());
                
config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
                if (streamGraph.isCheckpointingEnabled()) {
                        
config.setCheckpointMode(streamGraph.getCheckpointingMode());
-                       config.setStateBackend(streamGraph.getStateBackend());
                } else {
                        // the at least once input handler is slightly cheaper 
(in the absence of checkpoints),
                        // so we use that one if checkpointing is not enabled

http://git-wip-us.apache.org/repos/asf/flink/blob/2aeb6fac/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
new file mode 100644
index 0000000..cdfef85
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/state/StateBackendITCase.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.state;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class StateBackendITCase extends StreamingMultipleProgramsTestBase {
+
+       /**
+        * Verify that the user-specified state backend is used even if 
checkpointing is disabled.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testStateBackendWithoutCheckpointing() throws Exception {
+
+               StreamExecutionEnvironment see = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               see.setParallelism(1);
+
+               see.setNumberOfExecutionRetries(0);
+               see.setStateBackend(new FailingStateBackend());
+
+
+               see.fromElements(new Tuple2<>("Hello", 1))
+                       .keyBy(0)
+                       .map(new RichMapFunction<Tuple2<String,Integer>, 
String>() {
+                               private static final long serialVersionUID = 1L;
+
+                               @Override
+                               public void open(Configuration parameters) 
throws Exception {
+                                       super.open(parameters);
+                                       
getRuntimeContext().getKeyValueState("test", String.class, "");
+                               }
+
+                               @Override
+                               public String map(Tuple2<String, Integer> 
value) throws Exception {
+                                       return value.f0;
+                               }
+                       })
+                       .print();
+
+               boolean caughtSuccess = false;
+               try {
+                       see.execute();
+               } catch (JobExecutionException e) {
+                       if (e.getCause() instanceof SuccessException) {
+                               caughtSuccess = true;
+                       } else {
+                               throw e;
+                       }
+               }
+
+               assertTrue(caughtSuccess);
+       }
+
+
+       public static class FailingStateBackend extends 
StateBackend<FailingStateBackend> {
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void initializeForJob(Environment env) throws Exception {
+                       throw new SuccessException();
+               }
+
+               @Override
+               public void disposeAllStateForCurrentJob() throws Exception {
+
+               }
+
+               @Override
+               public void close() throws Exception {
+
+               }
+
+               @Override
+               public <K, V> KvState<K, V, FailingStateBackend> 
createKvState(String stateId,
+                       String stateName,
+                       TypeSerializer<K> keySerializer,
+                       TypeSerializer<V> valueSerializer,
+                       V defaultValue) throws Exception {
+                       return null;
+               }
+
+               @Override
+               public CheckpointStateOutputStream 
createCheckpointStateOutputStream(long checkpointID,
+                       long timestamp) throws Exception {
+                       return null;
+               }
+
+               @Override
+               public <S extends Serializable> StateHandle<S> 
checkpointStateSerializable(S state,
+                       long checkpointID,
+                       long timestamp) throws Exception {
+                       return null;
+               }
+       }
+
+       static final class SuccessException extends Exception {
+               private static final long serialVersionUID = 
-9218191172606739598L;
+       }
+
+}

Reply via email to