diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
index 934324607f0..a3e2668c26b 100644
--- a/flink-jepsen/README.md
+++ b/flink-jepsen/README.md
@@ -5,10 +5,11 @@ distributed coordination of Apache FlinkĀ®.
 ## Test Coverage
 Jepsen is a framework built to test the behavior of distributed systems
-under faults. The tests in this particular project deploy Flink on either YARN 
or Mesos, submit a
+under faults. The tests in this particular project deploy Flink on YARN, 
Mesos, or as a standalone cluster, submit a
 job, and examine the availability of the job after injecting faults.
 A job is said to be available if all the tasks of the job are running.
 The faults that can be currently introduced to the Flink cluster include:
 * Killing of TaskManager/JobManager processes
 * Stopping HDFS NameNode
 * Network partitions
diff --git a/flink-jepsen/project.clj b/flink-jepsen/project.clj
index 78935d71187..8c3e8451536 100644
--- a/flink-jepsen/project.clj
+++ b/flink-jepsen/project.clj
@@ -18,6 +18,7 @@
   :license {:name "Apache License"
             :url  "http://www.apache.org/licenses/LICENSE-2.0"}
   :main jepsen.flink.flink
+  :aot [jepsen.flink.flink]
   :dependencies [[org.clojure/clojure "1.9.0"],
                  [cheshire "5.8.0"]
                  [clj-http "3.8.0"]
diff --git a/flink-jepsen/scripts/run-tests.sh 
index e44812402f6..a2b256b6f6a 100755
--- a/flink-jepsen/scripts/run-tests.sh
+++ b/flink-jepsen/scripts/run-tests.sh
@@ -36,8 +36,15 @@ do
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers 
--deployment-mode yarn-session
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--deployment-mode yarn-session
   lein run test "${common_jepsen_args[@]}" --nemesis-gen 
fail-name-node-during-recovery --deployment-mode yarn-session
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers 
--deployment-mode yarn-job
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--deployment-mode yarn-job
   lein run test "${common_jepsen_args[@]}" --nemesis-gen 
fail-name-node-during-recovery --deployment-mode yarn-job
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers 
--deployment-mode mesos-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--deployment-mode mesos-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--deployment-mode standalone-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--client-gen cancel-job --deployment-mode standalone-session
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj 
index 02cc863bef5..7e437e9d628 100644
--- a/flink-jepsen/src/jepsen/flink/checker.clj
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -48,59 +48,126 @@
   [[_ v]]
   (zero? v))
+(defn- set-job-not-running
+  [model] (assoc model :healthy-count 0))
+(defn- track-job-running
+  [model]
+  (update model :healthy-count inc))
+(defn- elapsed-seconds
+  [start end]
+  (ju/nanos->secs (- end start)))
+(defn- should-cluster-be-healthy?
+  [model op]
+  (let [{:keys [active-nemeses last-failure job-recovery-grace-period]} model]
+    (and
+      (not (nemeses-active? active-nemeses))
+      (> (elapsed-seconds last-failure (:time op)) 
+(defn- start-fault
+  [model op]
+  (let [{:keys [active-nemeses]} model]
+    (assoc
+      model
+      :active-nemeses (update active-nemeses
+                              (strip-op-suffix op)
+                              safe-inc))))
+(defn- stop-fault
+  [model op]
+  (let [{:keys [active-nemeses]} model]
+    (assoc
+      model
+      :active-nemeses (dissoc-if zero-value?
+                                 (update active-nemeses (strip-op-suffix op) 
+      :last-failure (:time op))))
+(defn- job-allowed-to-be-running?
+  [model op]
+  (let [{:keys [job-canceled? job-canceled-time 
job-cancellation-grace-period]} model
+        now (:time op)]
+    (cond
+      (not job-canceled?) true
+      :else (> job-cancellation-grace-period (elapsed-seconds 
job-canceled-time now)))))
+(defn- handle-job-running?-op
+  "Returns the new model for an op {:f :job-running? ...}."
+  [model op]
+  (assert (#{:ok :fail :info} (:type op)) "Unexpected type")
+  (let [{:keys [job-canceled?]} model
+        job-running (:value op)
+        request-failed (#{:info :fail} (:type op))]
+    (if (and request-failed
+             (should-cluster-be-healthy? model op))
+      (model/inconsistent "Cluster is not running.")
+      (if job-running                                       ; cluster is 
running, check if job is running
+        (if (job-allowed-to-be-running? model op)           ; job is running 
but is it supposed to be running?
+          (track-job-running model)
+          (model/inconsistent
+            "Job is running after cancellation."))
+        (if (and                                            ; job is not 
+              (should-cluster-be-healthy? model op)
+              (not job-canceled?))
+          (model/inconsistent "Job is not running.")        ; job is not 
running but it should be running because grace period passed
+          (set-job-not-running model))))))
-  ^{:doc "A Model which is consistent iff. the Flink job became available 
+  ^{:doc "A Model which is consistent if the Flink job and the Flink cluster 
became available within
   `job-recovery-grace-period` seconds after the last fault injected by the 
   Note that some faults happen at a single point in time (e.g., killing of 
processes). Other faults,
   such as network splits, happen during a period of time, and can thus be 
interleaving. As long as
-  there are active faults, the job is allowed not to be available."}
+  there are active faults, the job and the cluster are allowed to be 
+  Note that this model assumes that the client dispatches the operations 
reliably, i.e., in case of
+  exceptions, the operations are retried or failed fatally."}
   [active-nemeses                                           ; stores active 
    healthy-count                                            ; how many 
consecutive times was the job running?
    last-failure                                             ; timestamp when 
the last failure was injected/ended
    healthy-threshold                                        ; after how many 
times is the job considered healthy
-   job-recovery-grace-period]                               ; after how many 
seconds should the job be recovered
+   job-recovery-grace-period                                ; after how many 
seconds should the job be recovered
+   job-cancellation-grace-period                            ; after how many 
seconds should the job be canceled?
+   job-canceled?                                            ; is the job 
+   job-canceled-time]                                       ; timestamp of 
   (step [this op]
     (case (:process op)
       :nemesis (cond
                  (nil? (:value op)) this
-                 (stoppable-op? op) (assoc
-                                      this
-                                      :active-nemeses (update active-nemeses
-                                                              (strip-op-suffix 
-                                                              safe-inc))
-                 (stop-op? op) (assoc
-                                 this
-                                 :active-nemeses (dissoc-if zero-value?
-                                                            (update 
active-nemeses (strip-op-suffix op) dec))
-                                 :last-failure (:time op))
+                 (stoppable-op? op) (start-fault this op)
+                 (stop-op? op) (stop-fault this op)
                  :else (assoc this :last-failure (:time op)))
-      (case (:f op)
-        :job-running? (case (:type op)
-                        :info this                          ; ignore :info 
-                        :fail this                          ; ignore :fail 
-                        :invoke this                        ; ignore :invoke 
-                        :ok (if (:value op)                 ; check if job is 
-                              (assoc                        ; job is running
-                                this
-                                :healthy-count
-                                (inc healthy-count))
-                              (if (and                      ; job is not 
-                                    (not (nemeses-active? active-nemeses))
-                                    (< healthy-count healthy-threshold)
-                                    (> (ju/nanos->secs (- (:time op) 
last-failure)) job-recovery-grace-period))
-                                ; job is not running but it should be running
-                                ; because grace period passed
-                                (model/inconsistent "Job is not running.")
-                                (conj this
-                                      [:healthy-count 0]))))
-        ; ignore other client operations
-        this))))
+      (if (= :invoke (:type op))
+        this                                                ; ignore :invoke 
+        (case (:f op)
+          :job-running? (handle-job-running?-op this op)
+          :cancel-job (do
+                        (assert (= :ok (:type op)) ":cancel-job must not fail")
+                        (assoc this :job-canceled? true :job-canceled-time 
(:time op)))
+          ; ignore other client operations
+          this)))))
 (defn job-running-within-grace-period
-  [job-running-healthy-threshold job-recovery-grace-period]
-  (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold 
+  ([job-running-healthy-threshold job-recovery-grace-period 
+   (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold 
job-recovery-grace-period job-cancellation-grace-period false nil))
+  ([job-running-healthy-threshold job-recovery-grace-period]
+   (job-running-within-grace-period job-running-healthy-threshold 
job-recovery-grace-period 10)))
+(defn get-job-running-history
+  [history]
+  (->>
+    history
+    (remove #(= (:process %) :nemesis))
+    (remove #(= (:type %) :invoke))
+    (map :value)
+    (map boolean)
+    (remove nil?)))
+(defn- healthy?
+  [model]
+  (>= (:healthy-count model) (:healthy-threshold model)))
 (defn job-running-checker
@@ -111,18 +178,11 @@
             result-map (conj {}
                              (find test :nemesis-gen)
                              (find test :deployment-mode))]
-        (if (or (model/inconsistent? final) (zero? (:healthy-count final 0)))
-          (into result-map {:valid? false
-                            :error  (:msg final)})
+        (if (or (model/inconsistent? final)
+                (and
+                  (not (healthy? final))
+                  (not (:job-canceled? final))))
+          (into result-map {:valid?      false
+                            :final-model final})
           (into result-map {:valid?      true
                             :final-model final}))))))
-(defn get-job-running-history
-  [history]
-  (->>
-    history
-    (remove #(= (:process %) :nemesis))
-    (remove #(= (:type %) :invoke))
-    (map :value)
-    (map boolean)
-    (remove nil?)))
diff --git a/flink-jepsen/src/jepsen/flink/client.clj 
index 905dc48911f..1ab987bd704 100644
--- a/flink-jepsen/src/jepsen/flink/client.clj
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -47,6 +47,12 @@
   (info "Waiting for path" path "in ZK.")
   (wait-for-zk-operation zk-client zk/exists path))
+(defn get-only-application-id
+  [coll]
+  (assert (= 1 (count coll)) (str "Expected 1 application id, got " coll ". "
+                                  "Failed to deploy the Flink cluster, or 
there are lingering Flink clusters."))
+  (first coll))
 (defn wait-for-children-to-exist
   [zk-client path]
   (wait-for-zk-operation zk-client zk/children path))
@@ -60,7 +66,7 @@
       (wait-for-children-to-exist zk-client "/flink")
-      (first))))
+      (get-only-application-id))))
 (defn watch-node-bytes
   [zk-client path callback]
@@ -97,54 +103,100 @@
     (map :id)))
-(defn get-job-details!
-  [base-url job-id]
-  (assert base-url)
-  (assert job-id)
-  (let [job-details (->
-                      (http/get (str base-url "/jobs/" job-id) {:as :json})
-                      :body)]
-    (assert (:vertices job-details) "Job does not have vertices")
-    job-details))
 (defn job-running?
   [base-url job-id]
-  (->>
-    (get-job-details! base-url job-id)
-    :vertices
-    (map :status)
-    (every? #(= "RUNNING" %))))
+  (let [response (http/get (str base-url "/jobs/" job-id) {:as :json 
:throw-exceptions false})
+        body (:body response)
+        error (:errors body)]
+    (cond
+      (http/missing? response) false
+      (not (http/success? response)) (throw (ex-info "Could not determine if 
job is running" {:job-id job-id :error error}))
+      :else (do
+              (assert (:vertices body) "Job does not have vertices")
+              (->>
+                body
+                :vertices
+                (map :status)
+                (every? #(= "RUNNING" %)))))))
+(defn- cancel-job!
+  "Cancels the specified job. Returns true if the job could be canceled.
+  Returns false if the job does not exist. Throws an exception if the HTTP 
+  is not successful."
+  [base-url job-id]
+  (let [response (http/patch (str base-url "/jobs/" job-id) {:as :json 
:throw-exceptions false})
+        error (-> response :body :errors)]
+    (cond
+      (http/missing? response) false
+      (not (http/success? response)) (throw (ex-info "Job cancellation 
unsuccessful" {:job-id job-id :error error}))
+      :else true)))
+(defmacro dispatch-operation
+  [op & body]
+  `(try
+     (assoc ~op :type :ok :value ~@body)
+     (catch Exception e# (do
+                           (warn e# "An exception occurred while running" 
(quote ~@body))
+                           (assoc ~op :type :fail :error (.getMessage e#))))))
+(defmacro dispatch-operation-or-fatal
+  "Dispatches op by evaluating body, retrying a number of times if needed.
+  Fails fatally if all retries are exhausted."
+  [op & body]
+  `(assoc ~op :type :ok :value (fu/retry (fn [] ~@body) :fallback (fn [e#]
+                                                                    (fatal e# 
"Required operation did not succeed" (quote ~@body))
(System/exit 1)))))
+(defn- dispatch-rest-operation!
+  [rest-url job-id op]
+  (assert job-id)
+  (if-not rest-url
+    (assoc op :type :fail :error "Have not determined REST URL yet.")
+    (case (:f op)
+      :job-running? (dispatch-operation op (fu/retry
+                                             (partial job-running? rest-url 
+                                             :retries 3
+                                             :fallback #(throw %)))
+      :cancel-job (dispatch-operation-or-fatal op (cancel-job! rest-url 
 (defrecord Client
-  [deploy-cluster! closer rest-url init-future job-id]
+  [deploy-cluster!                                          ; function that 
starts a non-standalone cluster and submits the job
+   closer                                                   ; function that 
closes the ZK client
+   rest-url                                                 ; atom storing the 
current rest-url
+   init-future                                              ; future that 
completes if rest-url is set to an initial value
+   job-id                                                   ; atom storing the 
+   job-submitted?]                                          ; Has the job 
already been submitted? Used to avoid re-submission if the client is re-opened.
-  (open! [this test node]
+  (open! [this test _]
+    (info "Open client.")
     (let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url 
-      (assoc this :closer closer :rest-url rest-url-atom :init-future 
init-future :job-id (atom nil))))
-  (setup! [this test] this)
-  (invoke! [this test op]
-    (case (:f op)
-      :submit (do
-                (deploy-cluster! test)
-                (deref init-future)
-                (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
-                                     :fallback (fn [e] (do
-                                                         (fatal e "Could not 
get running jobs.")
-                                                         (System/exit 1))))
-                      num-jobs (count jobs)]
-                  (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
-                  (reset! job-id (first jobs)))
-                (assoc op :type :ok))
-      :job-running? (let [base-url @rest-url]
-                      (if base-url
-                        (try
-                          (assoc op :type :ok :value (job-running? base-url 
-                          (catch Exception e (do
-                                               (warn e "Get job details from" 
base-url "failed.")
-                                               (assoc op :type :fail))))
-                        (assoc op :type :fail :value "Cluster not deployed 
-  (teardown! [this test])
-  (close! [this test] (closer)))
+      (assoc this :closer closer
+                  :rest-url rest-url-atom
+                  :init-future init-future)))
+  (setup! [_ test]
+    (info "Setup client.")
+    (when (compare-and-set! job-submitted? false true)
+      (deploy-cluster! test)
+      (deref init-future)
+      (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
+                           :fallback (fn [e]
+                                       (fatal e "Could not get running jobs.")
+                                       (System/exit 1)))
+            num-jobs (count jobs)
+            job (first jobs)]
+        (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
+        (info "Submitted job" job)
+        (reset! job-id job))))
+  (invoke! [_ _ op]
+    (dispatch-rest-operation! @rest-url @job-id op))
+  (teardown! [_ _])
+  (close! [_ _]
+    (info "Closing client.")
+    (closer)))
+(defn create-client
+  [deploy-cluster!]
+  (Client. deploy-cluster! nil nil nil (atom nil) (atom false)))
diff --git a/flink-jepsen/src/jepsen/flink/db.clj 
index 79ed8a45b4d..e0f5ff856d4 100644
--- a/flink-jepsen/src/jepsen/flink/db.clj
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -36,21 +36,22 @@
 (def conf-file (str install-dir "/conf/flink-conf.yaml"))
 (def masters-file (str install-dir "/conf/masters"))
-(def default-flink-dist-url 
+(def default-flink-dist-url 
 (def hadoop-dist-url 
 (def deb-zookeeper-package "3.4.9-3+deb8u1")
 (def deb-mesos-package "1.5.0-2.0.2")
 (def deb-marathon-package "1.6.322")
 (def taskmanager-slots 1)
-(def master-count 1)
 (defn flink-configuration
-  [test]
+  [test node]
   {:high-availability                  "zookeeper"
    :high-availability.zookeeper.quorum (zookeeper-quorum test)
    :high-availability.storageDir       (str (:ha-storage-dir test) "/ha")
+   :jobmanager.rpc.address             node
    :state.savepoints.dir               (str (:ha-storage-dir test) 
+   :rest.address                       node
    :rest.port                          8081
    :rest.bind-address                  ""
    :taskmanager.numberOfTaskSlots      taskmanager-slots
@@ -59,23 +60,17 @@
    :state.backend.local-recovery       "false"
    :taskmanager.registration.timeout   "30 s"})
-(defn master-nodes
-  [test]
-  (take master-count (sort (:nodes test))))
 (defn write-configuration!
-  "Writes the flink-conf.yaml and masters file to the flink conf directory"
-  [test]
+  "Writes the flink-conf.yaml to the flink conf directory"
+  [test node]
   (let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
-                                         (seq (flink-configuration test))))
-        m (clojure.string/join "\n" (master-nodes test))]
+                                         (seq (flink-configuration test 
     (c/exec :echo c :> conf-file)
-    (c/exec :echo m :> masters-file)
     ;; TODO: write log4j.properties properly
     (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, 
file/' " install-dir "/conf/log4j.properties")))))
 (defn install-flink!
-  [test]
+  [test node]
   (let [url (:tarball test)]
     (info "Installing Flink from" url)
     (cu/install-archive! url install-dir)
@@ -83,12 +78,12 @@
     (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I 
{} mv {} " install-dir "/lib")))
     (c/upload (:job-jar test) upload-dir)
     (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar 
test)))) install-dir)
-    (write-configuration! test)))
+    (write-configuration! test node)))
 (defn teardown-flink!
   (info "Tearing down Flink")
-  (cu/grepkill! "flink")
+  (meh (cu/grepkill! "flink"))
   (meh (c/exec :rm :-rf install-dir))
   (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
@@ -101,7 +96,7 @@
   (reify db/DB
     (setup! [_ test node]
-        (install-flink! test)))
+        (install-flink! test node)))
     (teardown! [_ test node]
@@ -120,26 +115,49 @@
         (doall (map #(db/setup! % test node) dbs))))
     (teardown! [_ test node]
-        (doall (map #(db/teardown! % test node) dbs))))
+        (try
+          (doall (map #(db/teardown! % test node) dbs))
+          (finally (fu/stop-all-supervised-services!)))))
     (log-files [_ test node]
-      (flatten (map #(db/log-files % test node) dbs)))))
+      (->>
+        (filter (partial satisfies? db/LogFiles) dbs)
+        (map #(db/log-files % test node))
+        (flatten)))))
-;;; YARN
+(defn- sorted-nodes
+  [test]
+  (-> test :nodes sort))
-(defn flink-yarn-db
+(defn- select-nodes
+  [test selector]
+  (-> (sorted-nodes test)
+      selector))
+(defn- first-node
+  [test]
+  (select-nodes test first))
+(defn- create-env-vars
+  "Expects a map containing environment variables, and returns a string that 
can be used to set
+  environment variables for a child process using Bash's quick assignment and 
inheritance trick.
+  For example, for a map {:FOO \"bar\"}, this function returns \"FOO=bar \"."
+  [m]
+  (->>
+    (map #(str (name (first %)) "=" (second %)) m)
+    (clojure.string/join " ")
+    (#(str % " "))))
+(defn- hadoop-env-vars
-  (let [zk (zk/db deb-zookeeper-package)
-        hadoop (hadoop/db hadoop-dist-url)
-        flink (flink-db)]
-    (combined-db [hadoop zk flink])))
+  (create-env-vars {:HADOOP_CLASSPATH (str "`" hadoop/install-dir "/bin/hadoop 
+                    :HADOOP_CONF_DIR  hadoop/hadoop-conf-dir}))
 (defn exec-flink!
-  [test cmd args]
+  [cmd args]
     (c/exec (c/lit (str
-                     "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop 
classpath` "
-                     "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+                     (hadoop-env-vars)
                      install-dir "/bin/flink " cmd " " args)))))
 (defn flink-run-cli-args
@@ -149,24 +167,84 @@
     (if (:main-class test)
       [(str "-c " (:main-class test))]
-      [])
-    (if (= :yarn-job (:deployment-mode test))
-      ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
 (defn submit-job!
   ([test] (submit-job! test []))
   ([test cli-args]
-   (exec-flink! test "run" (clojure.string/join
-                             " "
-                             (concat cli-args
-                                     (flink-run-cli-args test)
-                                     [(str install-dir "/" (last (str/split 
(:job-jar test) #"/")))
-                                      (:job-args test)])))))
-(defn first-node
+   (exec-flink! "run" (clojure.string/join
+                        " "
+                        (concat cli-args
+                                (flink-run-cli-args test)
+                                [(str install-dir "/" (last (str/split 
(:job-jar test) #"/")))
+                                 (:job-args test)])))))
+;;; Standalone
+(def standalone-master-count 2)
+(defn- standalone-master-nodes
-  (-> test :nodes sort first))
+  (select-nodes test (partial take standalone-master-count)))
+(defn- standalone-taskmanager-nodes
+  [test]
+  (select-nodes test (partial drop standalone-master-count)))
+(defn- start-standalone-masters!
+  [test node]
+  (when (some #{node} (standalone-master-nodes test))
+    (fu/create-supervised-service!
+      "flink-master"
+      (str "env " (hadoop-env-vars)
+           install-dir "/bin/jobmanager.sh start-foreground "
+           ">> " log-dir "/jobmanager.log"))))
+(defn- start-standalone-taskmanagers!
+  [test node]
+  (when (some #{node} (standalone-taskmanager-nodes test))
+    (fu/create-supervised-service!
+      "flink-taskmanager"
+      (str "env " (hadoop-env-vars)
+           install-dir "/bin/taskmanager.sh start-foreground "
+           ">> " log-dir "/taskmanager.log"))))
+(defn- start-flink-db
+  []
+  (reify db/DB
+    (setup! [_ test node]
+      (c/su
+        (start-standalone-masters! test node)
+        (start-standalone-taskmanagers! test node)))
+    (teardown! [_ test node]
+      (c/su
+        (when (some #{node} (standalone-master-nodes test))
+          (fu/stop-supervised-service! "flink-master"))
+        (when (some #{node} (standalone-taskmanager-nodes test))
+          (fu/stop-supervised-service! "flink-taskmanager"))))))
+(defn flink-standalone-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db)
+        start-flink (start-flink-db)]
+    (combined-db [hadoop zk flink start-flink])))
+(defn submit-job-from-first-node!
+  [test]
+  (c/on (first-node test)
+        (submit-job! test)))
+;;; YARN
+(defn flink-yarn-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db)]
+    (combined-db [hadoop zk flink])))
 (defn start-yarn-session!
@@ -174,8 +252,7 @@
     (c/on node
           (info "Starting YARN session from" node)
-            (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir 
"/bin/hadoop classpath` "
-                                "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
+            (c/exec (c/lit (str (hadoop-env-vars)
                                 " " install-dir "/bin/yarn-session.sh -d -jm 
2048m -tm 2048m")))
             (submit-job! test)))))
@@ -183,7 +260,7 @@
   (c/on (first-node test)
-          (submit-job! test))))
+          (submit-job! test ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]))))
 ;;; Mesos
@@ -203,6 +280,23 @@
                         (fatal e "Could not submit job.")
                         (System/exit 1)))))
+(defn mesos-appmaster-cmd
+  "Returns the command used by Marathon to start Flink's Mesos application 
+  [test]
+  (str (hadoop-env-vars)
+       install-dir "/bin/mesos-appmaster.sh "
+       "-Dmesos.master=" (zookeeper-uri
+                           test
+                           mesos/zk-namespace) " "
+       "-Djobmanager.rpc.address=$(hostname -f) "
+       "-Djobmanager.heap.mb=2048 "
+       "-Djobmanager.rpc.port=6123 "
+       "-Dmesos.resourcemanager.tasks.mem=2048 "
+       "-Dtaskmanager.heap.mb=2048 "
+       "-Dtaskmanager.numberOfTaskSlots=2 "
+       "-Dmesos.resourcemanager.tasks.cpus=1 "
+       "-Drest.bind-address=$(hostname -f) "))
 (defn start-mesos-session!
@@ -210,21 +304,7 @@
                           (str (mesos/marathon-base-url test) "/v2/apps")
                           {:form-params  {:id                    "flink"
-                                          :cmd                   (str 
"HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
"HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
install-dir "/bin/mesos-appmaster.sh "
"-Dmesos.master=" (zookeeper-uri
           mesos/zk-namespace) " "
"-Djobmanager.rpc.address=$(hostname -f) "
"-Djobmanager.heap.mb=2048 "
"-Djobmanager.rpc.port=6123 "
"-rest.port=8081 "
"-Dmesos.resourcemanager.tasks.mem=2048 "
"-Dtaskmanager.heap.mb=2048 "
"-Dtaskmanager.numberOfTaskSlots=2 "
"-Dmesos.resourcemanager.tasks.cpus=1 "
"-Drest.bind-address=$(hostname -f) ")
+                                          :cmd                   
(mesos-appmaster-cmd test)
                                           :cpus                  1.0
                                           :mem                   2048
                                           :maxLaunchDelaySeconds 3}
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj 
index d5d41579c38..c5d0d225932 100644
--- a/flink-jepsen/src/jepsen/flink/flink.clj
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -15,6 +15,7 @@
 ;; limitations under the License.
 (ns jepsen.flink.flink
+  (:gen-class)
   (:require [clojure.tools.logging :refer :all]
              [cli :as cli]
@@ -24,31 +25,48 @@
             [jepsen.flink.client :refer :all]
             [jepsen.flink.checker :as flink-checker]
             [jepsen.flink.db :as fdb]
-            [jepsen.flink.nemesis :as fn])
-  (:import (jepsen.flink.client Client)))
+            [jepsen.flink.nemesis :as fn]))
 (def flink-test-config
-  {:yarn-session  {:db                  (fdb/flink-yarn-db)
-                   :deployment-strategy fdb/start-yarn-session!}
-   :yarn-job      {:db                  (fdb/flink-yarn-db)
-                   :deployment-strategy fdb/start-yarn-job!}
-   :mesos-session {:db                  (fdb/flink-mesos-db)
-                   :deployment-strategy fdb/start-mesos-session!}})
+  {:yarn-session       {:db                  (fdb/flink-yarn-db)
+                        :deployment-strategy fdb/start-yarn-session!}
+   :yarn-job           {:db                  (fdb/flink-yarn-db)
+                        :deployment-strategy fdb/start-yarn-job!}
+   :mesos-session      {:db                  (fdb/flink-mesos-db)
+                        :deployment-strategy fdb/start-mesos-session!}
+   :standalone-session {:db                  (fdb/flink-standalone-db)
+                        :deployment-strategy fdb/submit-job-from-first-node!}})
-(defn client-gen
+(def poll-job-running {:type :invoke, :f :job-running?, :value nil})
+(def cancel-job {:type :invoke, :f :cancel-job, :value nil})
+(def poll-job-running-loop (gen/seq (cycle [poll-job-running (gen/sleep 5)])))
+(defn default-client-gen
+  "Client generator that polls for the job running status."
-    (cons {:type :invoke, :f :submit, :value nil}
-          (cycle [{:type :invoke, :f :job-running?, :value nil}
-                  (gen/sleep 5)]))
-    (gen/seq)
+    poll-job-running-loop
+(defn cancelling-client-gen
+  "Client generator that polls for the job running status, and cancels the job 
after 15 seconds."
+  []
+  (->
+    (gen/concat (gen/time-limit 15 (default-client-gen))
+                (gen/once cancel-job)
+                (default-client-gen))
+    (gen/singlethreaded)))
+(def client-gens
+  {:poll-job-running default-client-gen
+   :cancel-job       cancelling-client-gen})
 (defn flink-test
   (merge tests/noop-test
          (let [{:keys [db deployment-strategy]} (-> opts :deployment-mode 
-               {:keys [job-running-healthy-threshold 
job-recovery-grace-period]} opts]
+               {:keys [job-running-healthy-threshold 
job-recovery-grace-period]} opts
+               client-gen ((:client-gen opts) client-gens)]
            {:name      "Apache Flink"
             :os        debian/os
             :db        db
@@ -63,7 +81,7 @@
((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
-            :client    (Client. deployment-strategy nil nil nil nil)
+            :client    (create-client deployment-strategy)
             :checker   (flink-checker/job-running-checker)})
          (assoc opts :concurrency 1)))
@@ -93,6 +111,12 @@
                      :default :kill-task-managers
                      :validate [#(fn/nemesis-generator-factories (keyword %))
+                    [nil "--client-gen GEN" (str "Which client should be used?"
(keys-as-allowed-values-help-text client-gens))
+                     :parse-fn keyword
+                     :default :poll-job-running
+                     :validate [#(client-gens (keyword %))
+                                (keys-as-allowed-values-help-text 
                     [nil "--deployment-mode MODE" 
(keys-as-allowed-values-help-text flink-test-config)
                      :parse-fn keyword
                      :default :yarn-session
diff --git a/flink-jepsen/src/jepsen/flink/mesos.clj 
index a73f25fd489..aef73598da9 100644
--- a/flink-jepsen/src/jepsen/flink/mesos.clj
+++ b/flink-jepsen/src/jepsen/flink/mesos.clj
@@ -22,36 +22,9 @@
              [util :as util :refer [meh]]]
             [jepsen.control.util :as cu]
             [jepsen.os.debian :as debian]
+            [jepsen.flink.utils :refer [create-supervised-service! 
             [jepsen.flink.zookeeper :refer [zookeeper-uri]]))
-;;; runit process supervisor (http://smarden.org/runit/)
-;;; We use runit to supervise Mesos processes because Mesos uses a "fail-fast" 
approach to
-;;; error handling, e.g., the Mesos master will exit when it discovers it has 
been partitioned away
-;;; from the Zookeeper quorum.
-(def runit-version "2.1.2-3")
-(defn create-supervised-service!
-  "Registers a service with the process supervisor and starts it."
-  [service-name cmd]
-  (let [service-dir (str "/etc/sv/" service-name)
-        run-script (str service-dir "/run")]
-    (c/su
-      (c/exec :mkdir :-p service-dir)
-      (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
-                                               "exec 2>&1"
-                                               (str "exec " cmd)]) :> 
-      (c/exec :chmod :+x run-script)
-      (c/exec :ln :-sf service-dir (str "/etc/service/" service-name)))))
-(defn stop-supervised-service!
-  "Stops a service and removes it from supervision."
-  [service-name]
-  (c/su
-    (c/exec :sv :down service-name)
-    (c/exec :rm :-f (str "/etc/service/" service-name))))
 ;;; Mesos
 (def master-count 1)
@@ -154,8 +127,7 @@
     (debian/install {:mesos    mesos-version
-                     :marathon marathon-version
-                     :runit    runit-version})
+                     :marathon marathon-version})
     (c/exec :mkdir :-p "/var/run/mesos")
     (c/exec :mkdir :-p master-dir)
     (c/exec :mkdir :-p slave-dir)))
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj 
index 3047eeb9cc1..5335bba874c 100644
--- a/flink-jepsen/src/jepsen/flink/nemesis.clj
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -86,6 +86,11 @@
     (take n)
+(defn- inc-by-factor
+  [n factor]
+  (assert (>= factor 1))
+  (int (* n factor)))
 (defn stop-generator
   [stop source job-running-healthy-threshold job-recovery-grace-period]
   (gen/concat source
@@ -105,9 +110,12 @@
job-running-healthy-threshold false))]
                       (if (or
-                            (and
-                              (every? true? job-running-history))
-                            (> (ju/relative-time-nanos) (+ @t (ju/secs->nanos 
+                            (every? true? job-running-history)
+                            (> (ju/relative-time-nanos) (+ @t
+                                                           (ju/secs->nanos
+                                                             (inc-by-factor
+                                                               1.1)))))
                           (reset! stop true)
@@ -122,14 +130,14 @@
 (defn kill-taskmanagers-bursts-gen
   (fgen/time-limit time-limit
-                  (gen/seq (cycle (concat (repeat 20 {:type :info, :f 
-                                          [(gen/sleep 300)])))))
+                   (gen/seq (cycle (concat (repeat 20 {:type :info, :f 
+                                           [(gen/sleep 300)])))))
 (defn kill-jobmanagers-gen
   (fgen/time-limit (+ time-limit job-submit-grace-period)
-                  (gen/seq (cons (gen/sleep job-submit-grace-period)
-                                 (cycle [{:type :info, :f 
+                   (gen/seq (cons (gen/sleep job-submit-grace-period)
+                                  (cycle [{:type :info, :f 
 (defn fail-name-node-during-recovery
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj 
index 3fd9f961e13..50d0075ca35 100644
--- a/flink-jepsen/src/jepsen/flink/utils.clj
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -15,7 +15,10 @@
 ;; limitations under the License.
 (ns jepsen.flink.utils
-  (:require [clojure.tools.logging :refer :all]))
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]]
+            [jepsen.os.debian :as debian]))
 (defn retry
   "Runs a function op and retries on exception.
@@ -46,3 +49,41 @@
          (Thread/sleep delay)
          (recur op (assoc keys :retries (dec retries))))
        (success r)))))
+;;; runit process supervisor (http://smarden.org/runit/)
+(def runit-version "2.1.2-3")
+(defn- install-process-supervisor!
+  "Installs the process supervisor."
+  []
+  (debian/install {:runit runit-version}))
+(defn create-supervised-service!
+  "Registers a service with the process supervisor and starts it."
+  [service-name cmd]
+  (let [service-dir (str "/etc/sv/" service-name)
+        run-script (str service-dir "/run")]
+    (info "Create supervised service" service-name)
+    (c/su
+      (install-process-supervisor!)
+      (c/exec :mkdir :-p service-dir)
+      (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
+                                               "exec 2>&1"
+                                               (str "exec " cmd)]) :> 
+      (c/exec :chmod :+x run-script)
+      (c/exec :ln :-sfT service-dir (str "/etc/service/" service-name)))))
+(defn stop-supervised-service!
+  "Stops a service and removes it from supervision."
+  [service-name]
+  (info "Stop supervised service" service-name)
+  (c/su
+    (c/exec :rm :-f (str "/etc/service/" service-name))))
+(defn stop-all-supervised-services!
+  "Stops and removes all services from supervision if any."
+  []
+  (info "Stop all supervised services.")
+  (c/su
+    (c/exec :rm :-f (c/lit (str "/etc/service/*")))))
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj 
index 7389bbc15e9..c27d751e69e 100644
--- a/flink-jepsen/test/jepsen/flink/checker_test.clj
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -25,46 +25,91 @@
                  {:type :invoke, :f :job-running?, :value nil, :process 0, 
:time 127443701575}
                  {:type :ok, :f :job-running?, :value false, :process 0, :time 
                  {:type :invoke, :f :job-running?, :value nil, :process 0, 
:time 127453553463}
-                 {:type :ok, :f :job-running?, :value true, :process 0, :time 
-    (is (= (get-job-running-history history) [false true]))))
+                 {:type :ok, :f :job-running?, :value true, :process 0, :time 
+                 {:type :info, :f :job-running?, :value nil, :process 0, :time 
+    (is (= (get-job-running-history history) [false true false]))))
 (deftest job-running-checker-test
   (let [checker (job-running-checker)
         test {}
-        model (job-running-within-grace-period 3 60)
+        model (job-running-within-grace-period 3 60 10)
         opts {}
         check (fn [history] (checker/check checker test model history opts))]
-    (testing "Job is not running after grace period."
-      (is (= (:valid? (check
-                        [{:type :info, :f :kill-task-managers, :process 
:nemesis, :time 0, :value [""]}
-                         {:type :ok, :f :job-running?, :value false, :process 
0, :time 60000000001}])) false)))
-    (testing "Job is running after grace period."
-      (is (= (:valid? (check
-                        [{:type :info, :f :kill-task-managers, :process 
:nemesis, :time 0, :value [""]}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 60000000001}])) true)))
+    (testing "Model should be inconsistent if job is not running after grace 
+      (let [result (check
+                     [{:type :info, :f :kill-task-managers, :process :nemesis, 
:time 0, :value [""]}
+                      {:type :ok, :f :job-running?, :value false, :process 0, 
:time 60000000001}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is not running." (-> result :final-model :msg)))))
+    (testing "Model should be consistent if job is running after grace period."
+      (is (= true (:valid? (check
+                             [{:type :info, :f :kill-task-managers, :process 
:nemesis, :time 0, :value [""]}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000001}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000002}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000003}])))))
     (testing "Should tolerate non-running job during failures."
-      (is (= (:valid? (check
-                        [{:type :info, :f :partition-start, :process :nemesis, 
:time -1}
-                         {:type :info, :f :partition-start, :process :nemesis, 
:time 0, :value "Cut off [...]"}
-                         {:type :ok, :f :job-running?, :value false, :process 
0, :time 60000000001}
-                         {:type :info, :f :partition-stop, :process :nemesis, 
:time 60000000002}
-                         {:type :info, :f :partition-stop, :process :nemesis, 
:time 60000000003, :value "fully connected"}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 60000000004}])) true)))
-    (testing "Should respect healthy threshold."
-      (is (= (:valid? (check
-                        [{:type :ok, :f :job-running?, :value true, :process 
0, :time 0}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 1}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 2}
-                         {:type :ok, :f :job-running?, :value false, :process 
0, :time 60000000003}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 60000000004}])) true))
-      (is (= (:valid? (check
-                        [{:type :ok, :f :job-running?, :value true, :process 
0, :time 0}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 1}
-                         {:type :ok, :f :job-running?, :value false, :process 
0, :time 60000000002}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 60000000004}])) false)))
-    (testing "Job was not deployed successfully."
-      (is (= (:valid? (check [{:type :invoke, :f :job-running?, :value nil, 
:process 45, :time 239150413307}
-                              {:type :info, :f :job-running?, :value nil, 
:process 45, :time 239150751938, :error "indeterminate: Assert failed: 
job-id"}])) false)))))
+      (is (= true (:valid? (check
+                             [{:type :info, :f :partition-start, :process 
:nemesis, :time -1}
+                              {:type :info, :f :partition-start, :process 
:nemesis, :time 0, :value "Cut off [...]"}
+                              {:type :ok, :f :job-running?, :value false, 
:process 0, :time 60000000001}
+                              {:type :info, :f :partition-stop, :process 
:nemesis, :time 60000000002}
+                              {:type :info, :f :partition-stop, :process 
:nemesis, :time 60000000003, :value "fully connected"}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000004}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000005}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000006}])))))
+    (testing "Should not tolerate non-running job without a cause."
+      (let [result (check
+                     [{:type :ok, :f :job-running?, :value true, :process 0, 
:time 0}
+                      {:type :ok, :f :job-running?, :value true, :process 0, 
:time 1}
+                      {:type :ok, :f :job-running?, :value false, :process 0, 
:time 60000000001}
+                      {:type :ok, :f :job-running?, :value true, :process 0, 
:time 60000000002}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is not running." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if job submission was unsuccessful."
+      (let [result (check [{:type :invoke, :f :job-running?, :value nil, 
:process 0, :time 239150413307}
+                           {:type :info, :f :job-running?, :value nil, 
:process 0, :time 239150751938, :error "indeterminate: Assert failed: 
+        (is (= false (:valid? result)))))
+    (testing "Model should be inconsistent if the job status cannot be polled, 
i.e., if the cluster is unavailable."
+      (let [result (check [{:type :fail, :f :job-running?, :value nil, 
:process 0, :time 0 :error "Error"}
+                           {:type :fail, :f :job-running?, :value nil, 
:process 0, :time 60000000001 :error "Error"}
+                           {:type :fail, :f :job-running?, :value nil, 
:process 0, :time 60000000002 :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Should tolerate non-running job after cancellation."
+      (is (= true (:valid? (check [{:type :invoke, :f :cancel-job, :value nil, 
:process 0, :time 0}
+                                   {:type :ok, :f :cancel-job, :value true, 
:process 0, :time 1}
+                                   {:type :ok, :f :job-running?, :value true, 
:process 0, :time 2}
+                                   {:type :ok, :f :job-running?, :value false, 
:process 0, :time 3}])))))
+    (testing "Model should be inconsistent if job is running after 
+      (let [result (check [{:type :invoke, :f :cancel-job, :value nil, 
:process 0, :time 0}
+                           {:type :ok, :f :cancel-job, :value true, :process 
0, :time 1}
+                           {:type :ok, :f :job-running?, :value true, :process 
0, :time 10000000002}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is running after cancellation." (-> result :final-model 
+    (testing "Model should be inconsistent if Flink cluster is not available 
at the end."
+      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 
0, :time 0}
+                           {:type :ok, :f :job-running?, :value true, :process 
0, :time 1}
+                           {:type :ok, :f :job-running?, :value true, :process 
0, :time 2}
+                           {:type :fail, :f :job-running?, :value nil, 
:process 0, :time 60000000003, :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if Flink cluster is not available 
after job cancellation."
+      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 
0, :time 0}
+                           {:type :invoke, :f :cancel-job, :value nil, 
:process 0, :time 1}
+                           {:type :ok, :f :cancel-job, :value true, :process 
0, :time 2}
+                           {:type :fail, :f :job-running?, :value nil, 
:process 0, :time 60000000001, :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Should throw AssertionError if job cancelling operation failed."
+      (is (thrown-with-msg? AssertionError
+                            #":cancel-job must not fail"
+                            (check [{:type :fail, :f :cancel-job, :value nil, 
:process 0, :time 0}]))))
+    (testing "Should tolerate non-running job if grace period has not passed."
+      (is (= true (:valid? (check [{:type :invoke, :f :job-running?, :value 
nil, :process 0, :time 0}
+                                   {:type :ok, :f :job-running?, :value false, 
:process 0, :time 1}
+                                   {:type :ok, :f :job-running?, :value true, 
:process 0, :time 2}
+                                   {:type :ok, :f :job-running?, :value true, 
:process 0, :time 3}
+                                   {:type :ok, :f :job-running?, :value true, 
:process 0, :time 4}])))))))
 (deftest safe-inc-test
   (is (= (safe-inc nil) 1))
diff --git a/flink-jepsen/test/jepsen/flink/client_test.clj 
index b4373bfb124..a73c936d08d 100644
--- a/flink-jepsen/test/jepsen/flink/client_test.clj
+++ b/flink-jepsen/test/jepsen/flink/client_test.clj
@@ -17,7 +17,8 @@
 (ns jepsen.flink.client-test
   (:require [clojure.test :refer :all]
             [clj-http.fake :as fake]
-            [jepsen.flink.client :refer :all]))
+            [jepsen.flink.client :refer :all])
+  (:import (clojure.lang ExceptionInfo)))
 (deftest read-url-test
   (is (= "https://www.asdf.de"; (read-url (byte-array [0xAC 0xED 0x00 0x05 0x77 
0x15 0x00 0x13 0x68 0x74 0x74 0x70 0x73 0x3A 0x2F 0x2F 0x77 0x77 0x77 0x2E 0x61 
0x73 0x64 0x66 0x2E 0x64 0x65])))))
@@ -25,13 +26,52 @@
 (deftest job-running?-test
-     (fn [request] {:status  200
-                    :headers {}
-                    :body    
"{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window 
 Socket Stream -> Flat 
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: 
Print to Std. 
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; 
Sink: Print to Std. 
 Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
-     "http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196d";
-     (fn [request] {:status  200
-                    :headers {}
-                    :body    
"{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window 
 Socket Stream -> Flat 
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: 
Print to Std. 
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; 
Sink: Print to Std. 
 Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})}
+     (fn [_] {:status 200
+              :body   
"{\"jid\":\"a718f168ec6be8eff1345a17bf64196c\",\"name\":\"Socket Window 
 Socket Stream -> Flat 
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: 
Print to Std. 
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; 
Sink: Print to Std. 
 Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
+     "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2";
+     (fn [_] {:status 200
+              :body   
"{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window 
 Socket Stream -> Flat 
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: 
Print to Std. 
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; 
Sink: Print to Std. 
 Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
+     "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7";
+     (fn [_] {:status 404})}
     (is (= (job-running? "http://localhost:8081"; 
"a718f168ec6be8eff1345a17bf64196c") true))
-    (is (= (job-running? "http://localhost:8081"; 
"a718f168ec6be8eff1345a17bf64196d") false))))
+    (is (= (job-running? "http://localhost:8081"; 
"54ae4d8ec01d85053d7eb5d139492df2") false))
+    (is (= (job-running? "http://localhost:8081"; 
"ec3a61df646e665d31899bb26aba10b7") false))))
+(deftest cancel-job!-test
+  (fake/with-fake-routes
+    {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c";
+     {:patch (fn [_] {:status 202})}
+     "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2";
+     {:patch (fn [_] {:status 404})}
+     "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7";
+     {:patch (fn [_] {:status 503})}}
+    (testing "Successful job cancellation."
+      (is (= true (@#'jepsen.flink.client/cancel-job!
+                    "http://localhost:8081";
+                    "a718f168ec6be8eff1345a17bf64196c"))))
+    (testing "Job not found."
+      (is (= false (@#'jepsen.flink.client/cancel-job!
+                     "http://localhost:8081";
+                     "54ae4d8ec01d85053d7eb5d139492df2"))))
+    (testing "Throw if HTTP status code is 503."
+      (is (thrown-with-msg? ExceptionInfo
+                            #"Job cancellation unsuccessful"
+                            (@#'jepsen.flink.client/cancel-job!
+                              "http://localhost:8081";
+                              "ec3a61df646e665d31899bb26aba10b7"))))))
+(deftest dispatch-operation-test
+  (let [op {:type :invoke, :f :job-running?, :value nil}
+        test-fn (constantly 1)]
+    (testing "Dispatching operation completes normally."
+      (is (= {:type :ok :value 1} (select-keys (dispatch-operation op 
(test-fn)) [:type :value]))))
+    (testing "Returned operation should be of type :fail and have a nil value 
on exception."
+      (is (= {:type :fail :value nil :error "expected"} (select-keys 
(dispatch-operation op (throw (Exception. "expected"))) [:type :value 
diff --git a/flink-jepsen/test/jepsen/flink/nemesis_test.clj 
new file mode 100644
index 00000000000..488631ffea7
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/nemesis_test.clj
@@ -0,0 +1,32 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns jepsen.flink.nemesis-test
+  (:require [clojure.test :refer :all])
+  (:require [jepsen.flink.nemesis :refer :all]))
+(deftest inc-by-factor-test
+  (testing "Should not increase if factor is 1."
+    (is (= 10 (@#'jepsen.flink.nemesis/inc-by-factor 10 1))))
+  (testing "Should increase by factor."
+    (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.5))))
+  (testing "Should round down."
+    (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.52))))
+  (testing "Should throw if factor < 1."
+    (is (thrown? AssertionError (@#'jepsen.flink.nemesis/inc-by-factor 1 0)))))


> HA end-to-end/Jepsen tests for standby Dispatchers
> --------------------------------------------------
>                 Key: FLINK-10311
>                 URL: https://issues.apache.org/jira/browse/FLINK-10311
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Gary Yao
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0
> We should add end-to-end or Jepsen tests to verify the HA behaviour when 
> using multiple standby Dispatchers. In particular, we should verify that jobs 
> get properly cleaned up after they finished successfully (see FLINK-10255 and 
> FLINK-10011):
> 1. Test that a standby Dispatcher does not affect job execution and resource 
> cleanup
> 2. Test that a standby Dispatcher recovers all submitted jobs after the 
> leader loses leadership

