[ 
https://issues.apache.org/jira/browse/FLINK-10311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16632263#comment-16632263
 ] 

ASF GitHub Bot commented on FLINK-10311:
----------------------------------------

asfgit closed pull request #6712: [FLINK-10311][tests] HA end-to-end/Jepsen 
tests for standby Dispatchers
URL: https://github.com/apache/flink/pull/6712
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-jepsen/README.md b/flink-jepsen/README.md
index 934324607f0..a3e2668c26b 100644
--- a/flink-jepsen/README.md
+++ b/flink-jepsen/README.md
@@ -5,10 +5,11 @@ distributed coordination of Apache FlinkĀ®.
 
 ## Test Coverage
 Jepsen is a framework built to test the behavior of distributed systems
-under faults. The tests in this particular project deploy Flink on either YARN 
or Mesos, submit a
+under faults. The tests in this particular project deploy Flink on YARN, 
Mesos, or as a standalone cluster, submit a
 job, and examine the availability of the job after injecting faults.
 A job is said to be available if all the tasks of the job are running.
 The faults that can be currently introduced to the Flink cluster include:
+
 * Killing of TaskManager/JobManager processes
 * Stopping HDFS NameNode
 * Network partitions
diff --git a/flink-jepsen/project.clj b/flink-jepsen/project.clj
index 78935d71187..8c3e8451536 100644
--- a/flink-jepsen/project.clj
+++ b/flink-jepsen/project.clj
@@ -18,6 +18,7 @@
   :license {:name "Apache License"
             :url  "http://www.apache.org/licenses/LICENSE-2.0"}
   :main jepsen.flink.flink
+  :aot [jepsen.flink.flink]
   :dependencies [[org.clojure/clojure "1.9.0"],
                  [cheshire "5.8.0"]
                  [clj-http "3.8.0"]
diff --git a/flink-jepsen/scripts/run-tests.sh 
b/flink-jepsen/scripts/run-tests.sh
index e44812402f6..a2b256b6f6a 100755
--- a/flink-jepsen/scripts/run-tests.sh
+++ b/flink-jepsen/scripts/run-tests.sh
@@ -36,8 +36,15 @@ do
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers 
--deployment-mode yarn-session
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--deployment-mode yarn-session
   lein run test "${common_jepsen_args[@]}" --nemesis-gen 
fail-name-node-during-recovery --deployment-mode yarn-session
+
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers 
--deployment-mode yarn-job
   lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--deployment-mode yarn-job
   lein run test "${common_jepsen_args[@]}" --nemesis-gen 
fail-name-node-during-recovery --deployment-mode yarn-job
+
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-task-managers 
--deployment-mode mesos-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--deployment-mode mesos-session
+
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--deployment-mode standalone-session
+  lein run test "${common_jepsen_args[@]}" --nemesis-gen kill-job-managers 
--client-gen cancel-job --deployment-mode standalone-session
   echo
 done
diff --git a/flink-jepsen/src/jepsen/flink/checker.clj 
b/flink-jepsen/src/jepsen/flink/checker.clj
index 02cc863bef5..7e437e9d628 100644
--- a/flink-jepsen/src/jepsen/flink/checker.clj
+++ b/flink-jepsen/src/jepsen/flink/checker.clj
@@ -48,59 +48,126 @@
   [[_ v]]
   (zero? v))
 
+(defn- set-job-not-running
+  [model] (assoc model :healthy-count 0))
+
+(defn- track-job-running
+  [model]
+  (update model :healthy-count inc))
+
+(defn- elapsed-seconds
+  [start end]
+  (ju/nanos->secs (- end start)))
+
+(defn- should-cluster-be-healthy?
+  [model op]
+  (let [{:keys [active-nemeses last-failure job-recovery-grace-period]} model]
+    (and
+      (not (nemeses-active? active-nemeses))
+      (> (elapsed-seconds last-failure (:time op)) 
job-recovery-grace-period))))
+
+(defn- start-fault
+  [model op]
+  (let [{:keys [active-nemeses]} model]
+    (assoc
+      model
+      :active-nemeses (update active-nemeses
+                              (strip-op-suffix op)
+                              safe-inc))))
+
+(defn- stop-fault
+  [model op]
+  (let [{:keys [active-nemeses]} model]
+    (assoc
+      model
+      :active-nemeses (dissoc-if zero-value?
+                                 (update active-nemeses (strip-op-suffix op) 
dec))
+      :last-failure (:time op))))
+
+(defn- job-allowed-to-be-running?
+  [model op]
+  (let [{:keys [job-canceled? job-canceled-time 
job-cancellation-grace-period]} model
+        now (:time op)]
+    (cond
+      (not job-canceled?) true
+      :else (> job-cancellation-grace-period (elapsed-seconds 
job-canceled-time now)))))
+
+(defn- handle-job-running?-op
+  "Returns the new model for an op {:f :job-running? ...}."
+  [model op]
+  (assert (#{:ok :fail :info} (:type op)) "Unexpected type")
+  (let [{:keys [job-canceled?]} model
+        job-running (:value op)
+        request-failed (#{:info :fail} (:type op))]
+    (if (and request-failed
+             (should-cluster-be-healthy? model op))
+      (model/inconsistent "Cluster is not running.")
+      (if job-running                                       ; cluster is 
running, check if job is running
+        (if (job-allowed-to-be-running? model op)           ; job is running 
but is it supposed to be running?
+          (track-job-running model)
+          (model/inconsistent
+            "Job is running after cancellation."))
+        (if (and                                            ; job is not 
running
+              (should-cluster-be-healthy? model op)
+              (not job-canceled?))
+          (model/inconsistent "Job is not running.")        ; job is not 
running but it should be running because grace period passed
+          (set-job-not-running model))))))
+
 (defrecord
   JobRunningWithinGracePeriod
-  ^{:doc "A Model which is consistent iff. the Flink job became available 
within
+  ^{:doc "A Model which is consistent if the Flink job and the Flink cluster 
became available within
   `job-recovery-grace-period` seconds after the last fault injected by the 
nemesis.
   Note that some faults happen at a single point in time (e.g., killing of 
processes). Other faults,
   such as network splits, happen during a period of time, and can thus be 
interleaving. As long as
-  there are active faults, the job is allowed not to be available."}
+  there are active faults, the job and the cluster are allowed to be 
unavailable.
+
+  Note that this model assumes that the client dispatches the operations 
reliably, i.e., in case of
+  exceptions, the operations are retried or failed fatally."}
   [active-nemeses                                           ; stores active 
failures
    healthy-count                                            ; how many 
consecutive times was the job running?
    last-failure                                             ; timestamp when 
the last failure was injected/ended
    healthy-threshold                                        ; after how many 
times is the job considered healthy
-   job-recovery-grace-period]                               ; after how many 
seconds should the job be recovered
+   job-recovery-grace-period                                ; after how many 
seconds should the job be recovered
+   job-cancellation-grace-period                            ; after how many 
seconds should the job be canceled?
+   job-canceled?                                            ; is the job 
canceled?
+   job-canceled-time]                                       ; timestamp of 
cancellation
   Model
   (step [this op]
     (case (:process op)
       :nemesis (cond
                  (nil? (:value op)) this
-                 (stoppable-op? op) (assoc
-                                      this
-                                      :active-nemeses (update active-nemeses
-                                                              (strip-op-suffix 
op)
-                                                              safe-inc))
-                 (stop-op? op) (assoc
-                                 this
-                                 :active-nemeses (dissoc-if zero-value?
-                                                            (update 
active-nemeses (strip-op-suffix op) dec))
-                                 :last-failure (:time op))
+                 (stoppable-op? op) (start-fault this op)
+                 (stop-op? op) (stop-fault this op)
                  :else (assoc this :last-failure (:time op)))
-      (case (:f op)
-        :job-running? (case (:type op)
-                        :info this                          ; ignore :info 
operations
-                        :fail this                          ; ignore :fail 
operations
-                        :invoke this                        ; ignore :invoke 
operations
-                        :ok (if (:value op)                 ; check if job is 
running
-                              (assoc                        ; job is running
-                                this
-                                :healthy-count
-                                (inc healthy-count))
-                              (if (and                      ; job is not 
running
-                                    (not (nemeses-active? active-nemeses))
-                                    (< healthy-count healthy-threshold)
-                                    (> (ju/nanos->secs (- (:time op) 
last-failure)) job-recovery-grace-period))
-                                ; job is not running but it should be running
-                                ; because grace period passed
-                                (model/inconsistent "Job is not running.")
-                                (conj this
-                                      [:healthy-count 0]))))
-        ; ignore other client operations
-        this))))
+      (if (= :invoke (:type op))
+        this                                                ; ignore :invoke 
operations
+        (case (:f op)
+          :job-running? (handle-job-running?-op this op)
+          :cancel-job (do
+                        (assert (= :ok (:type op)) ":cancel-job must not fail")
+                        (assoc this :job-canceled? true :job-canceled-time 
(:time op)))
+          ; ignore other client operations
+          this)))))
 
 (defn job-running-within-grace-period
-  [job-running-healthy-threshold job-recovery-grace-period]
-  (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold 
job-recovery-grace-period))
+  ([job-running-healthy-threshold job-recovery-grace-period 
job-cancellation-grace-period]
+   (JobRunningWithinGracePeriod. {} 0 nil job-running-healthy-threshold 
job-recovery-grace-period job-cancellation-grace-period false nil))
+  ([job-running-healthy-threshold job-recovery-grace-period]
+   (job-running-within-grace-period job-running-healthy-threshold 
job-recovery-grace-period 10)))
+
+(defn get-job-running-history
+  [history]
+  (->>
+    history
+    (remove #(= (:process %) :nemesis))
+    (remove #(= (:type %) :invoke))
+    (map :value)
+    (map boolean)
+    (remove nil?)))
+
+(defn- healthy?
+  [model]
+  (>= (:healthy-count model) (:healthy-threshold model)))
 
 (defn job-running-checker
   []
@@ -111,18 +178,11 @@
             result-map (conj {}
                              (find test :nemesis-gen)
                              (find test :deployment-mode))]
-        (if (or (model/inconsistent? final) (zero? (:healthy-count final 0)))
-          (into result-map {:valid? false
-                            :error  (:msg final)})
+        (if (or (model/inconsistent? final)
+                (and
+                  (not (healthy? final))
+                  (not (:job-canceled? final))))
+          (into result-map {:valid?      false
+                            :final-model final})
           (into result-map {:valid?      true
                             :final-model final}))))))
-
-(defn get-job-running-history
-  [history]
-  (->>
-    history
-    (remove #(= (:process %) :nemesis))
-    (remove #(= (:type %) :invoke))
-    (map :value)
-    (map boolean)
-    (remove nil?)))
diff --git a/flink-jepsen/src/jepsen/flink/client.clj 
b/flink-jepsen/src/jepsen/flink/client.clj
index 905dc48911f..1ab987bd704 100644
--- a/flink-jepsen/src/jepsen/flink/client.clj
+++ b/flink-jepsen/src/jepsen/flink/client.clj
@@ -47,6 +47,12 @@
   (info "Waiting for path" path "in ZK.")
   (wait-for-zk-operation zk-client zk/exists path))
 
+(defn get-only-application-id
+  [coll]
+  (assert (= 1 (count coll)) (str "Expected 1 application id, got " coll ". "
+                                  "Failed to deploy the Flink cluster, or 
there are lingering Flink clusters."))
+  (first coll))
+
 (defn wait-for-children-to-exist
   [zk-client path]
   (wait-for-zk-operation zk-client zk/children path))
@@ -60,7 +66,7 @@
     (->
       (wait-for-children-to-exist zk-client "/flink")
       (deref)
-      (first))))
+      (get-only-application-id))))
 
 (defn watch-node-bytes
   [zk-client path callback]
@@ -97,54 +103,100 @@
     :jobs
     (map :id)))
 
-(defn get-job-details!
-  [base-url job-id]
-  (assert base-url)
-  (assert job-id)
-  (let [job-details (->
-                      (http/get (str base-url "/jobs/" job-id) {:as :json})
-                      :body)]
-    (assert (:vertices job-details) "Job does not have vertices")
-    job-details))
-
 (defn job-running?
   [base-url job-id]
-  (->>
-    (get-job-details! base-url job-id)
-    :vertices
-    (map :status)
-    (every? #(= "RUNNING" %))))
+  (let [response (http/get (str base-url "/jobs/" job-id) {:as :json 
:throw-exceptions false})
+        body (:body response)
+        error (:errors body)]
+    (cond
+      (http/missing? response) false
+      (not (http/success? response)) (throw (ex-info "Could not determine if 
job is running" {:job-id job-id :error error}))
+      :else (do
+              (assert (:vertices body) "Job does not have vertices")
+              (->>
+                body
+                :vertices
+                (map :status)
+                (every? #(= "RUNNING" %)))))))
+
+(defn- cancel-job!
+  "Cancels the specified job. Returns true if the job could be canceled.
+  Returns false if the job does not exist. Throws an exception if the HTTP 
status
+  is not successful."
+  [base-url job-id]
+  (let [response (http/patch (str base-url "/jobs/" job-id) {:as :json 
:throw-exceptions false})
+        error (-> response :body :errors)]
+    (cond
+      (http/missing? response) false
+      (not (http/success? response)) (throw (ex-info "Job cancellation 
unsuccessful" {:job-id job-id :error error}))
+      :else true)))
+
+(defmacro dispatch-operation
+  [op & body]
+  `(try
+     (assoc ~op :type :ok :value ~@body)
+     (catch Exception e# (do
+                           (warn e# "An exception occurred while running" 
(quote ~@body))
+                           (assoc ~op :type :fail :error (.getMessage e#))))))
+
+(defmacro dispatch-operation-or-fatal
+  "Dispatches op by evaluating body, retrying a number of times if needed.
+  Fails fatally if all retries are exhausted."
+  [op & body]
+  `(assoc ~op :type :ok :value (fu/retry (fn [] ~@body) :fallback (fn [e#]
+                                                                    (fatal e# 
"Required operation did not succeed" (quote ~@body))
+                                                                    
(System/exit 1)))))
+
+(defn- dispatch-rest-operation!
+  [rest-url job-id op]
+  (assert job-id)
+  (if-not rest-url
+    (assoc op :type :fail :error "Have not determined REST URL yet.")
+    (case (:f op)
+      :job-running? (dispatch-operation op (fu/retry
+                                             (partial job-running? rest-url 
job-id)
+                                             :retries 3
+                                             :fallback #(throw %)))
+      :cancel-job (dispatch-operation-or-fatal op (cancel-job! rest-url 
job-id)))))
 
 (defrecord Client
-  [deploy-cluster! closer rest-url init-future job-id]
+  [deploy-cluster!                                          ; function that 
starts a non-standalone cluster and submits the job
+   closer                                                   ; function that 
closes the ZK client
+   rest-url                                                 ; atom storing the 
current rest-url
+   init-future                                              ; future that 
completes if rest-url is set to an initial value
+   job-id                                                   ; atom storing the 
job-id
+   job-submitted?]                                          ; Has the job 
already been submitted? Used to avoid re-submission if the client is re-opened.
   client/Client
-  (open! [this test node]
+  (open! [this test _]
+    (info "Open client.")
     (let [{:keys [rest-url-atom closer init-future]} (make-job-manager-url 
test)]
-      (assoc this :closer closer :rest-url rest-url-atom :init-future 
init-future :job-id (atom nil))))
-
-  (setup! [this test] this)
-
-  (invoke! [this test op]
-    (case (:f op)
-      :submit (do
-                (deploy-cluster! test)
-                (deref init-future)
-                (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
-                                     :fallback (fn [e] (do
-                                                         (fatal e "Could not 
get running jobs.")
-                                                         (System/exit 1))))
-                      num-jobs (count jobs)]
-                  (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
-                  (reset! job-id (first jobs)))
-                (assoc op :type :ok))
-      :job-running? (let [base-url @rest-url]
-                      (if base-url
-                        (try
-                          (assoc op :type :ok :value (job-running? base-url 
@job-id))
-                          (catch Exception e (do
-                                               (warn e "Get job details from" 
base-url "failed.")
-                                               (assoc op :type :fail))))
-                        (assoc op :type :fail :value "Cluster not deployed 
yet.")))))
-
-  (teardown! [this test])
-  (close! [this test] (closer)))
+      (assoc this :closer closer
+                  :rest-url rest-url-atom
+                  :init-future init-future)))
+
+  (setup! [_ test]
+    (info "Setup client.")
+    (when (compare-and-set! job-submitted? false true)
+      (deploy-cluster! test)
+      (deref init-future)
+      (let [jobs (fu/retry (fn [] (list-jobs! @rest-url))
+                           :fallback (fn [e]
+                                       (fatal e "Could not get running jobs.")
+                                       (System/exit 1)))
+            num-jobs (count jobs)
+            job (first jobs)]
+        (assert (= 1 num-jobs) (str "Expected 1 job, was " num-jobs))
+        (info "Submitted job" job)
+        (reset! job-id job))))
+
+  (invoke! [_ _ op]
+    (dispatch-rest-operation! @rest-url @job-id op))
+
+  (teardown! [_ _])
+  (close! [_ _]
+    (info "Closing client.")
+    (closer)))
+
+(defn create-client
+  [deploy-cluster!]
+  (Client. deploy-cluster! nil nil nil (atom nil) (atom false)))
diff --git a/flink-jepsen/src/jepsen/flink/db.clj 
b/flink-jepsen/src/jepsen/flink/db.clj
index 79ed8a45b4d..e0f5ff856d4 100644
--- a/flink-jepsen/src/jepsen/flink/db.clj
+++ b/flink-jepsen/src/jepsen/flink/db.clj
@@ -36,21 +36,22 @@
 (def conf-file (str install-dir "/conf/flink-conf.yaml"))
 (def masters-file (str install-dir "/conf/masters"))
 
-(def default-flink-dist-url 
"https://archive.apache.org/dist/flink/flink-1.5.0/flink-1.5.0-bin-hadoop28-scala_2.11.tgz";)
+(def default-flink-dist-url 
"https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop28-scala_2.11.tgz";)
 (def hadoop-dist-url 
"https://archive.apache.org/dist/hadoop/common/hadoop-2.8.3/hadoop-2.8.3.tar.gz";)
 (def deb-zookeeper-package "3.4.9-3+deb8u1")
 (def deb-mesos-package "1.5.0-2.0.2")
 (def deb-marathon-package "1.6.322")
 
 (def taskmanager-slots 1)
-(def master-count 1)
 
 (defn flink-configuration
-  [test]
+  [test node]
   {:high-availability                  "zookeeper"
    :high-availability.zookeeper.quorum (zookeeper-quorum test)
    :high-availability.storageDir       (str (:ha-storage-dir test) "/ha")
+   :jobmanager.rpc.address             node
    :state.savepoints.dir               (str (:ha-storage-dir test) 
"/savepoints")
+   :rest.address                       node
    :rest.port                          8081
    :rest.bind-address                  "0.0.0.0"
    :taskmanager.numberOfTaskSlots      taskmanager-slots
@@ -59,23 +60,17 @@
    :state.backend.local-recovery       "false"
    :taskmanager.registration.timeout   "30 s"})
 
-(defn master-nodes
-  [test]
-  (take master-count (sort (:nodes test))))
-
 (defn write-configuration!
-  "Writes the flink-conf.yaml and masters file to the flink conf directory"
-  [test]
+  "Writes the flink-conf.yaml to the flink conf directory"
+  [test node]
   (let [c (clojure.string/join "\n" (map (fn [[k v]] (str (name k) ": " v))
-                                         (seq (flink-configuration test))))
-        m (clojure.string/join "\n" (master-nodes test))]
+                                         (seq (flink-configuration test 
node))))]
     (c/exec :echo c :> conf-file)
-    (c/exec :echo m :> masters-file)
     ;; TODO: write log4j.properties properly
     (c/exec (c/lit (str "sed -i'.bak' -e '/log4j.rootLogger=/ s/=.*/=DEBUG, 
file/' " install-dir "/conf/log4j.properties")))))
 
 (defn install-flink!
-  [test]
+  [test node]
   (let [url (:tarball test)]
     (info "Installing Flink from" url)
     (cu/install-archive! url install-dir)
@@ -83,12 +78,12 @@
     (c/exec (c/lit (str "ls " install-dir "/opt/flink-s3-fs-hadoop* | xargs -I 
{} mv {} " install-dir "/lib")))
     (c/upload (:job-jar test) upload-dir)
     (c/exec :mv (str upload-dir "/" (.getName (clojure.java.io/file (:job-jar 
test)))) install-dir)
-    (write-configuration! test)))
+    (write-configuration! test node)))
 
 (defn teardown-flink!
   []
   (info "Tearing down Flink")
-  (cu/grepkill! "flink")
+  (meh (cu/grepkill! "flink"))
   (meh (c/exec :rm :-rf install-dir))
   (meh (c/exec :rm :-rf (c/lit "/tmp/.yarn-properties*"))))
 
@@ -101,7 +96,7 @@
   (reify db/DB
     (setup! [_ test node]
       (c/su
-        (install-flink! test)))
+        (install-flink! test node)))
 
     (teardown! [_ test node]
       (c/su
@@ -120,26 +115,49 @@
         (doall (map #(db/setup! % test node) dbs))))
     (teardown! [_ test node]
       (c/su
-        (doall (map #(db/teardown! % test node) dbs))))
+        (try
+          (doall (map #(db/teardown! % test node) dbs))
+          (finally (fu/stop-all-supervised-services!)))))
     db/LogFiles
     (log-files [_ test node]
-      (flatten (map #(db/log-files % test node) dbs)))))
+      (->>
+        (filter (partial satisfies? db/LogFiles) dbs)
+        (map #(db/log-files % test node))
+        (flatten)))))
 
-;;; YARN
+(defn- sorted-nodes
+  [test]
+  (-> test :nodes sort))
 
-(defn flink-yarn-db
+(defn- select-nodes
+  [test selector]
+  (-> (sorted-nodes test)
+      selector))
+
+(defn- first-node
+  [test]
+  (select-nodes test first))
+
+(defn- create-env-vars
+  "Expects a map containing environment variables, and returns a string that 
can be used to set
+  environment variables for a child process using Bash's quick assignment and 
inheritance trick.
+  For example, for a map {:FOO \"bar\"}, this function returns \"FOO=bar \"."
+  [m]
+  (->>
+    (map #(str (name (first %)) "=" (second %)) m)
+    (clojure.string/join " ")
+    (#(str % " "))))
+
+(defn- hadoop-env-vars
   []
-  (let [zk (zk/db deb-zookeeper-package)
-        hadoop (hadoop/db hadoop-dist-url)
-        flink (flink-db)]
-    (combined-db [hadoop zk flink])))
+  (create-env-vars {:HADOOP_CLASSPATH (str "`" hadoop/install-dir "/bin/hadoop 
classpath`")
+                    :HADOOP_CONF_DIR  hadoop/hadoop-conf-dir}))
 
 (defn exec-flink!
-  [test cmd args]
+  [cmd args]
   (c/su
     (c/exec (c/lit (str
-                     "HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop 
classpath` "
-                     "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
+                     (hadoop-env-vars)
                      install-dir "/bin/flink " cmd " " args)))))
 
 (defn flink-run-cli-args
@@ -149,24 +167,84 @@
     ["-d"]
     (if (:main-class test)
       [(str "-c " (:main-class test))]
-      [])
-    (if (= :yarn-job (:deployment-mode test))
-      ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]
       [])))
 
 (defn submit-job!
   ([test] (submit-job! test []))
   ([test cli-args]
-   (exec-flink! test "run" (clojure.string/join
-                             " "
-                             (concat cli-args
-                                     (flink-run-cli-args test)
-                                     [(str install-dir "/" (last (str/split 
(:job-jar test) #"/")))
-                                      (:job-args test)])))))
-
-(defn first-node
+   (exec-flink! "run" (clojure.string/join
+                        " "
+                        (concat cli-args
+                                (flink-run-cli-args test)
+                                [(str install-dir "/" (last (str/split 
(:job-jar test) #"/")))
+                                 (:job-args test)])))))
+
+;;; Standalone
+
+(def standalone-master-count 2)
+
+(defn- standalone-master-nodes
   [test]
-  (-> test :nodes sort first))
+  (select-nodes test (partial take standalone-master-count)))
+
+(defn- standalone-taskmanager-nodes
+  [test]
+  (select-nodes test (partial drop standalone-master-count)))
+
+(defn- start-standalone-masters!
+  [test node]
+  (when (some #{node} (standalone-master-nodes test))
+    (fu/create-supervised-service!
+      "flink-master"
+      (str "env " (hadoop-env-vars)
+           install-dir "/bin/jobmanager.sh start-foreground "
+           ">> " log-dir "/jobmanager.log"))))
+
+(defn- start-standalone-taskmanagers!
+  [test node]
+  (when (some #{node} (standalone-taskmanager-nodes test))
+    (fu/create-supervised-service!
+      "flink-taskmanager"
+      (str "env " (hadoop-env-vars)
+           install-dir "/bin/taskmanager.sh start-foreground "
+           ">> " log-dir "/taskmanager.log"))))
+
+(defn- start-flink-db
+  []
+  (reify db/DB
+    (setup! [_ test node]
+      (c/su
+        (start-standalone-masters! test node)
+        (start-standalone-taskmanagers! test node)))
+
+    (teardown! [_ test node]
+      (c/su
+        (when (some #{node} (standalone-master-nodes test))
+          (fu/stop-supervised-service! "flink-master"))
+        (when (some #{node} (standalone-taskmanager-nodes test))
+          (fu/stop-supervised-service! "flink-taskmanager"))))))
+
+(defn flink-standalone-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db)
+        start-flink (start-flink-db)]
+    (combined-db [hadoop zk flink start-flink])))
+
+(defn submit-job-from-first-node!
+  [test]
+  (c/on (first-node test)
+        (submit-job! test)))
+
+;;; YARN
+
+(defn flink-yarn-db
+  []
+  (let [zk (zk/db deb-zookeeper-package)
+        hadoop (hadoop/db hadoop-dist-url)
+        flink (flink-db)]
+    (combined-db [hadoop zk flink])))
 
 (defn start-yarn-session!
   [test]
@@ -174,8 +252,7 @@
     (c/on node
           (info "Starting YARN session from" node)
           (c/su
-            (c/exec (c/lit (str "HADOOP_CLASSPATH=`" hadoop/install-dir 
"/bin/hadoop classpath` "
-                                "HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir
+            (c/exec (c/lit (str (hadoop-env-vars)
                                 " " install-dir "/bin/yarn-session.sh -d -jm 
2048m -tm 2048m")))
             (submit-job! test)))))
 
@@ -183,7 +260,7 @@
   [test]
   (c/on (first-node test)
         (c/su
-          (submit-job! test))))
+          (submit-job! test ["-m yarn-cluster" "-yjm 2048m" "-ytm 2048m"]))))
 
 ;;; Mesos
 
@@ -203,6 +280,23 @@
                         (fatal e "Could not submit job.")
                         (System/exit 1)))))
 
+(defn mesos-appmaster-cmd
+  "Returns the command used by Marathon to start Flink's Mesos application 
master."
+  [test]
+  (str (hadoop-env-vars)
+       install-dir "/bin/mesos-appmaster.sh "
+       "-Dmesos.master=" (zookeeper-uri
+                           test
+                           mesos/zk-namespace) " "
+       "-Djobmanager.rpc.address=$(hostname -f) "
+       "-Djobmanager.heap.mb=2048 "
+       "-Djobmanager.rpc.port=6123 "
+       "-Dmesos.resourcemanager.tasks.mem=2048 "
+       "-Dtaskmanager.heap.mb=2048 "
+       "-Dtaskmanager.numberOfTaskSlots=2 "
+       "-Dmesos.resourcemanager.tasks.cpus=1 "
+       "-Drest.bind-address=$(hostname -f) "))
+
 (defn start-mesos-session!
   [test]
   (c/su
@@ -210,21 +304,7 @@
                         (http/post
                           (str (mesos/marathon-base-url test) "/v2/apps")
                           {:form-params  {:id                    "flink"
-                                          :cmd                   (str 
"HADOOP_CLASSPATH=`" hadoop/install-dir "/bin/hadoop classpath` "
-                                                                      
"HADOOP_CONF_DIR=" hadoop/hadoop-conf-dir " "
-                                                                      
install-dir "/bin/mesos-appmaster.sh "
-                                                                      
"-Dmesos.master=" (zookeeper-uri
-                                                                               
           test
-                                                                               
           mesos/zk-namespace) " "
-                                                                      
"-Djobmanager.rpc.address=$(hostname -f) "
-                                                                      
"-Djobmanager.heap.mb=2048 "
-                                                                      
"-Djobmanager.rpc.port=6123 "
-                                                                      
"-rest.port=8081 "
-                                                                      
"-Dmesos.resourcemanager.tasks.mem=2048 "
-                                                                      
"-Dtaskmanager.heap.mb=2048 "
-                                                                      
"-Dtaskmanager.numberOfTaskSlots=2 "
-                                                                      
"-Dmesos.resourcemanager.tasks.cpus=1 "
-                                                                      
"-Drest.bind-address=$(hostname -f) ")
+                                          :cmd                   
(mesos-appmaster-cmd test)
                                           :cpus                  1.0
                                           :mem                   2048
                                           :maxLaunchDelaySeconds 3}
diff --git a/flink-jepsen/src/jepsen/flink/flink.clj 
b/flink-jepsen/src/jepsen/flink/flink.clj
index d5d41579c38..c5d0d225932 100644
--- a/flink-jepsen/src/jepsen/flink/flink.clj
+++ b/flink-jepsen/src/jepsen/flink/flink.clj
@@ -15,6 +15,7 @@
 ;; limitations under the License.
 
 (ns jepsen.flink.flink
+  (:gen-class)
   (:require [clojure.tools.logging :refer :all]
             [jepsen
              [cli :as cli]
@@ -24,31 +25,48 @@
             [jepsen.flink.client :refer :all]
             [jepsen.flink.checker :as flink-checker]
             [jepsen.flink.db :as fdb]
-            [jepsen.flink.nemesis :as fn])
-  (:import (jepsen.flink.client Client)))
+            [jepsen.flink.nemesis :as fn]))
 
 (def flink-test-config
-  {:yarn-session  {:db                  (fdb/flink-yarn-db)
-                   :deployment-strategy fdb/start-yarn-session!}
-   :yarn-job      {:db                  (fdb/flink-yarn-db)
-                   :deployment-strategy fdb/start-yarn-job!}
-   :mesos-session {:db                  (fdb/flink-mesos-db)
-                   :deployment-strategy fdb/start-mesos-session!}})
+  {:yarn-session       {:db                  (fdb/flink-yarn-db)
+                        :deployment-strategy fdb/start-yarn-session!}
+   :yarn-job           {:db                  (fdb/flink-yarn-db)
+                        :deployment-strategy fdb/start-yarn-job!}
+   :mesos-session      {:db                  (fdb/flink-mesos-db)
+                        :deployment-strategy fdb/start-mesos-session!}
+   :standalone-session {:db                  (fdb/flink-standalone-db)
+                        :deployment-strategy fdb/submit-job-from-first-node!}})
 
-(defn client-gen
+(def poll-job-running {:type :invoke, :f :job-running?, :value nil})
+(def cancel-job {:type :invoke, :f :cancel-job, :value nil})
+(def poll-job-running-loop (gen/seq (cycle [poll-job-running (gen/sleep 5)])))
+
+(defn default-client-gen
+  "Client generator that polls for the job running status."
   []
   (->
-    (cons {:type :invoke, :f :submit, :value nil}
-          (cycle [{:type :invoke, :f :job-running?, :value nil}
-                  (gen/sleep 5)]))
-    (gen/seq)
+    poll-job-running-loop
     (gen/singlethreaded)))
 
+(defn cancelling-client-gen
+  "Client generator that polls for the job running status, and cancels the job 
after 15 seconds."
+  []
+  (->
+    (gen/concat (gen/time-limit 15 (default-client-gen))
+                (gen/once cancel-job)
+                (default-client-gen))
+    (gen/singlethreaded)))
+
+(def client-gens
+  {:poll-job-running default-client-gen
+   :cancel-job       cancelling-client-gen})
+
 (defn flink-test
   [opts]
   (merge tests/noop-test
          (let [{:keys [db deployment-strategy]} (-> opts :deployment-mode 
flink-test-config)
-               {:keys [job-running-healthy-threshold 
job-recovery-grace-period]} opts]
+               {:keys [job-running-healthy-threshold 
job-recovery-grace-period]} opts
+               client-gen ((:client-gen opts) client-gens)]
            {:name      "Apache Flink"
             :os        debian/os
             :db        db
@@ -63,7 +81,7 @@
                                                    
((fn/nemesis-generator-factories (:nemesis-gen opts)) opts)
                                                    
job-running-healthy-threshold
                                                    
job-recovery-grace-period))))
-            :client    (Client. deployment-strategy nil nil nil nil)
+            :client    (create-client deployment-strategy)
             :checker   (flink-checker/job-running-checker)})
          (assoc opts :concurrency 1)))
 
@@ -93,6 +111,12 @@
                      :default :kill-task-managers
                      :validate [#(fn/nemesis-generator-factories (keyword %))
                                 (keys-as-allowed-values-help-text 
fn/nemesis-generator-factories)]]
+                    [nil "--client-gen GEN" (str "Which client should be used?"
+                                                 
(keys-as-allowed-values-help-text client-gens))
+                     :parse-fn keyword
+                     :default :poll-job-running
+                     :validate [#(client-gens (keyword %))
+                                (keys-as-allowed-values-help-text 
client-gens)]]
                     [nil "--deployment-mode MODE" 
(keys-as-allowed-values-help-text flink-test-config)
                      :parse-fn keyword
                      :default :yarn-session
diff --git a/flink-jepsen/src/jepsen/flink/mesos.clj 
b/flink-jepsen/src/jepsen/flink/mesos.clj
index a73f25fd489..aef73598da9 100644
--- a/flink-jepsen/src/jepsen/flink/mesos.clj
+++ b/flink-jepsen/src/jepsen/flink/mesos.clj
@@ -22,36 +22,9 @@
              [util :as util :refer [meh]]]
             [jepsen.control.util :as cu]
             [jepsen.os.debian :as debian]
+            [jepsen.flink.utils :refer [create-supervised-service! 
stop-supervised-service!]]
             [jepsen.flink.zookeeper :refer [zookeeper-uri]]))
 
-;;; runit process supervisor (http://smarden.org/runit/)
-;;;
-;;; We use runit to supervise Mesos processes because Mesos uses a "fail-fast" 
approach to
-;;; error handling, e.g., the Mesos master will exit when it discovers it has 
been partitioned away
-;;; from the Zookeeper quorum.
-
-(def runit-version "2.1.2-3")
-
-(defn create-supervised-service!
-  "Registers a service with the process supervisor and starts it."
-  [service-name cmd]
-  (let [service-dir (str "/etc/sv/" service-name)
-        run-script (str service-dir "/run")]
-    (c/su
-      (c/exec :mkdir :-p service-dir)
-      (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
-                                               "exec 2>&1"
-                                               (str "exec " cmd)]) :> 
run-script)
-      (c/exec :chmod :+x run-script)
-      (c/exec :ln :-sf service-dir (str "/etc/service/" service-name)))))
-
-(defn stop-supervised-service!
-  "Stops a service and removes it from supervision."
-  [service-name]
-  (c/su
-    (c/exec :sv :down service-name)
-    (c/exec :rm :-f (str "/etc/service/" service-name))))
-
 ;;; Mesos
 
 (def master-count 1)
@@ -154,8 +127,7 @@
                       "keyserver.ubuntu.com"
                       "E56151BF")
     (debian/install {:mesos    mesos-version
-                     :marathon marathon-version
-                     :runit    runit-version})
+                     :marathon marathon-version})
     (c/exec :mkdir :-p "/var/run/mesos")
     (c/exec :mkdir :-p master-dir)
     (c/exec :mkdir :-p slave-dir)))
diff --git a/flink-jepsen/src/jepsen/flink/nemesis.clj 
b/flink-jepsen/src/jepsen/flink/nemesis.clj
index 3047eeb9cc1..5335bba874c 100644
--- a/flink-jepsen/src/jepsen/flink/nemesis.clj
+++ b/flink-jepsen/src/jepsen/flink/nemesis.clj
@@ -86,6 +86,11 @@
     (take n)
     (reverse)))
 
+(defn- inc-by-factor
+  [n factor]
+  (assert (>= factor 1))
+  (int (* n factor)))
+
 (defn stop-generator
   [stop source job-running-healthy-threshold job-recovery-grace-period]
   (gen/concat source
@@ -105,9 +110,12 @@
                                                 
(flink-checker/get-job-running-history)
                                                 (take-last-with-default 
job-running-healthy-threshold false))]
                       (if (or
-                            (and
-                              (every? true? job-running-history))
-                            (> (ju/relative-time-nanos) (+ @t (ju/secs->nanos 
job-recovery-grace-period))))
+                            (every? true? job-running-history)
+                            (> (ju/relative-time-nanos) (+ @t
+                                                           (ju/secs->nanos
+                                                             (inc-by-factor
+                                                               
job-recovery-grace-period
+                                                               1.1)))))
                         (do
                           (reset! stop true)
                           nil)
@@ -122,14 +130,14 @@
 (defn kill-taskmanagers-bursts-gen
   [time-limit]
   (fgen/time-limit time-limit
-                  (gen/seq (cycle (concat (repeat 20 {:type :info, :f 
:kill-task-managers})
-                                          [(gen/sleep 300)])))))
+                   (gen/seq (cycle (concat (repeat 20 {:type :info, :f 
:kill-task-managers})
+                                           [(gen/sleep 300)])))))
 
 (defn kill-jobmanagers-gen
   [time-limit]
   (fgen/time-limit (+ time-limit job-submit-grace-period)
-                  (gen/seq (cons (gen/sleep job-submit-grace-period)
-                                 (cycle [{:type :info, :f 
:kill-job-manager}])))))
+                   (gen/seq (cons (gen/sleep job-submit-grace-period)
+                                  (cycle [{:type :info, :f 
:kill-job-manager}])))))
 
 (defn fail-name-node-during-recovery
   []
diff --git a/flink-jepsen/src/jepsen/flink/utils.clj 
b/flink-jepsen/src/jepsen/flink/utils.clj
index 3fd9f961e13..50d0075ca35 100644
--- a/flink-jepsen/src/jepsen/flink/utils.clj
+++ b/flink-jepsen/src/jepsen/flink/utils.clj
@@ -15,7 +15,10 @@
 ;; limitations under the License.
 
 (ns jepsen.flink.utils
-  (:require [clojure.tools.logging :refer :all]))
+  (:require [clojure.tools.logging :refer :all]
+            [jepsen
+             [control :as c]]
+            [jepsen.os.debian :as debian]))
 
 (defn retry
   "Runs a function op and retries on exception.
@@ -46,3 +49,41 @@
          (Thread/sleep delay)
          (recur op (assoc keys :retries (dec retries))))
        (success r)))))
+
+;;; runit process supervisor (http://smarden.org/runit/)
+
+(def runit-version "2.1.2-3")
+
+(defn- install-process-supervisor!
+  "Installs the process supervisor."
+  []
+  (debian/install {:runit runit-version}))
+
+(defn create-supervised-service!
+  "Registers a service with the process supervisor and starts it."
+  [service-name cmd]
+  (let [service-dir (str "/etc/sv/" service-name)
+        run-script (str service-dir "/run")]
+    (info "Create supervised service" service-name)
+    (c/su
+      (install-process-supervisor!)
+      (c/exec :mkdir :-p service-dir)
+      (c/exec :echo (clojure.string/join "\n" ["#!/bin/sh"
+                                               "exec 2>&1"
+                                               (str "exec " cmd)]) :> 
run-script)
+      (c/exec :chmod :+x run-script)
+      (c/exec :ln :-sfT service-dir (str "/etc/service/" service-name)))))
+
+(defn stop-supervised-service!
+  "Stops a service and removes it from supervision."
+  [service-name]
+  (info "Stop supervised service" service-name)
+  (c/su
+    (c/exec :rm :-f (str "/etc/service/" service-name))))
+
+(defn stop-all-supervised-services!
+  "Stops and removes all services from supervision if any."
+  []
+  (info "Stop all supervised services.")
+  (c/su
+    (c/exec :rm :-f (c/lit (str "/etc/service/*")))))
diff --git a/flink-jepsen/test/jepsen/flink/checker_test.clj 
b/flink-jepsen/test/jepsen/flink/checker_test.clj
index 7389bbc15e9..c27d751e69e 100644
--- a/flink-jepsen/test/jepsen/flink/checker_test.clj
+++ b/flink-jepsen/test/jepsen/flink/checker_test.clj
@@ -25,46 +25,91 @@
                  {:type :invoke, :f :job-running?, :value nil, :process 0, 
:time 127443701575}
                  {:type :ok, :f :job-running?, :value false, :process 0, :time 
127453553462}
                  {:type :invoke, :f :job-running?, :value nil, :process 0, 
:time 127453553463}
-                 {:type :ok, :f :job-running?, :value true, :process 0, :time 
127453553464}]]
-    (is (= (get-job-running-history history) [false true]))))
+                 {:type :ok, :f :job-running?, :value true, :process 0, :time 
127453553464}
+                 {:type :info, :f :job-running?, :value nil, :process 0, :time 
127453553465}]]
+    (is (= (get-job-running-history history) [false true false]))))
 
 (deftest job-running-checker-test
   (let [checker (job-running-checker)
         test {}
-        model (job-running-within-grace-period 3 60)
+        model (job-running-within-grace-period 3 60 10)
         opts {}
         check (fn [history] (checker/check checker test model history opts))]
-    (testing "Job is not running after grace period."
-      (is (= (:valid? (check
-                        [{:type :info, :f :kill-task-managers, :process 
:nemesis, :time 0, :value ["172.31.32.48"]}
-                         {:type :ok, :f :job-running?, :value false, :process 
0, :time 60000000001}])) false)))
-    (testing "Job is running after grace period."
-      (is (= (:valid? (check
-                        [{:type :info, :f :kill-task-managers, :process 
:nemesis, :time 0, :value ["172.31.32.48"]}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 60000000001}])) true)))
+    (testing "Model should be inconsistent if job is not running after grace 
period."
+      (let [result (check
+                     [{:type :info, :f :kill-task-managers, :process :nemesis, 
:time 0, :value ["172.31.32.48"]}
+                      {:type :ok, :f :job-running?, :value false, :process 0, 
:time 60000000001}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is not running." (-> result :final-model :msg)))))
+    (testing "Model should be consistent if job is running after grace period."
+      (is (= true (:valid? (check
+                             [{:type :info, :f :kill-task-managers, :process 
:nemesis, :time 0, :value ["172.31.32.48"]}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000001}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000002}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000003}])))))
     (testing "Should tolerate non-running job during failures."
-      (is (= (:valid? (check
-                        [{:type :info, :f :partition-start, :process :nemesis, 
:time -1}
-                         {:type :info, :f :partition-start, :process :nemesis, 
:time 0, :value "Cut off [...]"}
-                         {:type :ok, :f :job-running?, :value false, :process 
0, :time 60000000001}
-                         {:type :info, :f :partition-stop, :process :nemesis, 
:time 60000000002}
-                         {:type :info, :f :partition-stop, :process :nemesis, 
:time 60000000003, :value "fully connected"}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 60000000004}])) true)))
-    (testing "Should respect healthy threshold."
-      (is (= (:valid? (check
-                        [{:type :ok, :f :job-running?, :value true, :process 
0, :time 0}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 1}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 2}
-                         {:type :ok, :f :job-running?, :value false, :process 
0, :time 60000000003}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 60000000004}])) true))
-      (is (= (:valid? (check
-                        [{:type :ok, :f :job-running?, :value true, :process 
0, :time 0}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 1}
-                         {:type :ok, :f :job-running?, :value false, :process 
0, :time 60000000002}
-                         {:type :ok, :f :job-running?, :value true, :process 
0, :time 60000000004}])) false)))
-    (testing "Job was not deployed successfully."
-      (is (= (:valid? (check [{:type :invoke, :f :job-running?, :value nil, 
:process 45, :time 239150413307}
-                              {:type :info, :f :job-running?, :value nil, 
:process 45, :time 239150751938, :error "indeterminate: Assert failed: 
job-id"}])) false)))))
+      (is (= true (:valid? (check
+                             [{:type :info, :f :partition-start, :process 
:nemesis, :time -1}
+                              {:type :info, :f :partition-start, :process 
:nemesis, :time 0, :value "Cut off [...]"}
+                              {:type :ok, :f :job-running?, :value false, 
:process 0, :time 60000000001}
+                              {:type :info, :f :partition-stop, :process 
:nemesis, :time 60000000002}
+                              {:type :info, :f :partition-stop, :process 
:nemesis, :time 60000000003, :value "fully connected"}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000004}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000005}
+                              {:type :ok, :f :job-running?, :value true, 
:process 0, :time 60000000006}])))))
+    (testing "Should not tolerate non-running job without a cause."
+      (let [result (check
+                     [{:type :ok, :f :job-running?, :value true, :process 0, 
:time 0}
+                      {:type :ok, :f :job-running?, :value true, :process 0, 
:time 1}
+                      {:type :ok, :f :job-running?, :value false, :process 0, 
:time 60000000001}
+                      {:type :ok, :f :job-running?, :value true, :process 0, 
:time 60000000002}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is not running." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if job submission was unsuccessful."
+      (let [result (check [{:type :invoke, :f :job-running?, :value nil, 
:process 0, :time 239150413307}
+                           {:type :info, :f :job-running?, :value nil, 
:process 0, :time 239150751938, :error "indeterminate: Assert failed: 
job-id"}])]
+        (is (= false (:valid? result)))))
+    (testing "Model should be inconsistent if the job status cannot be polled, 
i.e., if the cluster is unavailable."
+      (let [result (check [{:type :fail, :f :job-running?, :value nil, 
:process 0, :time 0 :error "Error"}
+                           {:type :fail, :f :job-running?, :value nil, 
:process 0, :time 60000000001 :error "Error"}
+                           {:type :fail, :f :job-running?, :value nil, 
:process 0, :time 60000000002 :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Should tolerate non-running job after cancellation."
+      (is (= true (:valid? (check [{:type :invoke, :f :cancel-job, :value nil, 
:process 0, :time 0}
+                                   {:type :ok, :f :cancel-job, :value true, 
:process 0, :time 1}
+                                   {:type :ok, :f :job-running?, :value true, 
:process 0, :time 2}
+                                   {:type :ok, :f :job-running?, :value false, 
:process 0, :time 3}])))))
+    (testing "Model should be inconsistent if job is running after 
cancellation."
+      (let [result (check [{:type :invoke, :f :cancel-job, :value nil, 
:process 0, :time 0}
+                           {:type :ok, :f :cancel-job, :value true, :process 
0, :time 1}
+                           {:type :ok, :f :job-running?, :value true, :process 
0, :time 10000000002}])]
+        (is (= false (:valid? result)))
+        (is (= "Job is running after cancellation." (-> result :final-model 
:msg)))))
+    (testing "Model should be inconsistent if Flink cluster is not available 
at the end."
+      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 
0, :time 0}
+                           {:type :ok, :f :job-running?, :value true, :process 
0, :time 1}
+                           {:type :ok, :f :job-running?, :value true, :process 
0, :time 2}
+                           {:type :fail, :f :job-running?, :value nil, 
:process 0, :time 60000000003, :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Model should be inconsistent if Flink cluster is not available 
after job cancellation."
+      (let [result (check [{:type :ok, :f :job-running?, :value true, :process 
0, :time 0}
+                           {:type :invoke, :f :cancel-job, :value nil, 
:process 0, :time 1}
+                           {:type :ok, :f :cancel-job, :value true, :process 
0, :time 2}
+                           {:type :fail, :f :job-running?, :value nil, 
:process 0, :time 60000000001, :error "Error"}])]
+        (is (= false (:valid? result)))
+        (is (= "Cluster is not running." (-> result :final-model :msg)))))
+    (testing "Should throw AssertionError if job cancelling operation failed."
+      (is (thrown-with-msg? AssertionError
+                            #":cancel-job must not fail"
+                            (check [{:type :fail, :f :cancel-job, :value nil, 
:process 0, :time 0}]))))
+    (testing "Should tolerate non-running job if grace period has not passed."
+      (is (= true (:valid? (check [{:type :invoke, :f :job-running?, :value 
nil, :process 0, :time 0}
+                                   {:type :ok, :f :job-running?, :value false, 
:process 0, :time 1}
+                                   {:type :ok, :f :job-running?, :value true, 
:process 0, :time 2}
+                                   {:type :ok, :f :job-running?, :value true, 
:process 0, :time 3}
+                                   {:type :ok, :f :job-running?, :value true, 
:process 0, :time 4}])))))))
 
 (deftest safe-inc-test
   (is (= (safe-inc nil) 1))
diff --git a/flink-jepsen/test/jepsen/flink/client_test.clj 
b/flink-jepsen/test/jepsen/flink/client_test.clj
index b4373bfb124..a73c936d08d 100644
--- a/flink-jepsen/test/jepsen/flink/client_test.clj
+++ b/flink-jepsen/test/jepsen/flink/client_test.clj
@@ -17,7 +17,8 @@
 (ns jepsen.flink.client-test
   (:require [clojure.test :refer :all]
             [clj-http.fake :as fake]
-            [jepsen.flink.client :refer :all]))
+            [jepsen.flink.client :refer :all])
+  (:import (clojure.lang ExceptionInfo)))
 
 (deftest read-url-test
   (is (= "https://www.asdf.de"; (read-url (byte-array [0xAC 0xED 0x00 0x05 0x77 
0x15 0x00 0x13 0x68 0x74 0x74 0x70 0x73 0x3A 0x2F 0x2F 0x77 0x77 0x77 0x2E 0x61 
0x73 0x64 0x66 0x2E 0x64 0x65])))))
@@ -25,13 +26,52 @@
 (deftest job-running?-test
   (fake/with-fake-routes
     {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c";
-     (fn [request] {:status  200
-                    :headers {}
-                    :body    
"{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window 
WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source:
 Socket Stream -> Flat 
Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000),
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: 
Print to Std. 
Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket
 Window 
WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000),
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; 
Sink: Print to Std. 
Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source:
 Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
-     "http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196d";
-     (fn [request] {:status  200
-                    :headers {}
-                    :body    
"{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window 
WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source:
 Socket Stream -> Flat 
Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000),
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: 
Print to Std. 
Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket
 Window 
WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000),
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; 
Sink: Print to Std. 
Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source:
 Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})}
+     (fn [_] {:status 200
+              :body   
"{\"jid\":\"a718f168ec6be8eff1345a17bf64196c\",\"name\":\"Socket Window 
WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source:
 Socket Stream -> Flat 
Map\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000),
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: 
Print to Std. 
Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket
 Window 
WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000),
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; 
Sink: Print to Std. 
Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source:
 Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
+
+     "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2";
+     (fn [_] {:status 200
+              :body   
"{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket Window 
WordCount\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1522059578198,\"end-time\":-1,\"duration\":19505,\"now\":1522059597703,\"timestamps\":{\"RUNNING\":1522059578244,\"RESTARTING\":0,\"RECONCILING\":0,\"CREATED\":1522059578198,\"FAILING\":0,\"FINISHED\":0,\"CANCELLING\":0,\"SUSPENDING\":0,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0},\"vertices\":[{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"name\":\"Source:
 Socket Stream -> Flat 
Map\",\"parallelism\":1,\"status\":\"CREATED\",\"start-time\":1522059578369,\"end-time\":-1,\"duration\":19334,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}},{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"name\":\"Window(TumblingProcessingTimeWindows(5000),
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -> Sink: 
Print to Std. 
Out\",\"parallelism\":1,\"status\":\"RUNNING\",\"start-time\":1522059578381,\"end-time\":-1,\"duration\":19322,\"tasks\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":1,\"CREATED\":0,\"FINISHED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":false,\"write-bytes\":0,\"write-bytes-complete\":false,\"read-records\":0,\"read-records-complete\":false,\"write-records\":0,\"write-records-complete\":false}}],\"status-counts\":{\"DEPLOYING\":0,\"SCHEDULED\":0,\"CANCELED\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FAILED\":0,\"RUNNING\":2,\"CREATED\":0,\"FINISHED\":0},\"plan\":{\"jid\":\"54ae4d8ec01d85053d7eb5d139492df2\",\"name\":\"Socket
 Window 
WordCount\",\"nodes\":[{\"id\":\"90bea66de1c231edf33913ecd54406c1\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Window(TumblingProcessingTimeWindows(5000),
 ProcessingTimeTrigger, ReduceFunction$1, PassThroughWindowFunction) -&gt; 
Sink: Print to Std. 
Out\",\"inputs\":[{\"num\":0,\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"cbc357ccb763df2852fee8c4fc7d55f2\",\"parallelism\":1,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source:
 Socket Stream -&gt; Flat Map\",\"optimizer_properties\":{}}]}}"})
+
+     "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7";
+     (fn [_] {:status 404})}
 
     (is (= (job-running? "http://localhost:8081"; 
"a718f168ec6be8eff1345a17bf64196c") true))
-    (is (= (job-running? "http://localhost:8081"; 
"a718f168ec6be8eff1345a17bf64196d") false))))
+    (is (= (job-running? "http://localhost:8081"; 
"54ae4d8ec01d85053d7eb5d139492df2") false))
+    (is (= (job-running? "http://localhost:8081"; 
"ec3a61df646e665d31899bb26aba10b7") false))))
+
+(deftest cancel-job!-test
+  (fake/with-fake-routes
+    {"http://localhost:8081/jobs/a718f168ec6be8eff1345a17bf64196c";
+     {:patch (fn [_] {:status 202})}
+
+     "http://localhost:8081/jobs/54ae4d8ec01d85053d7eb5d139492df2";
+     {:patch (fn [_] {:status 404})}
+
+     "http://localhost:8081/jobs/ec3a61df646e665d31899bb26aba10b7";
+     {:patch (fn [_] {:status 503})}}
+
+    (testing "Successful job cancellation."
+      (is (= true (@#'jepsen.flink.client/cancel-job!
+                    "http://localhost:8081";
+                    "a718f168ec6be8eff1345a17bf64196c"))))
+
+    (testing "Job not found."
+      (is (= false (@#'jepsen.flink.client/cancel-job!
+                     "http://localhost:8081";
+                     "54ae4d8ec01d85053d7eb5d139492df2"))))
+
+    (testing "Throw if HTTP status code is 503."
+      (is (thrown-with-msg? ExceptionInfo
+                            #"Job cancellation unsuccessful"
+                            (@#'jepsen.flink.client/cancel-job!
+                              "http://localhost:8081";
+                              "ec3a61df646e665d31899bb26aba10b7"))))))
+
+(deftest dispatch-operation-test
+  (let [op {:type :invoke, :f :job-running?, :value nil}
+        test-fn (constantly 1)]
+    (testing "Dispatching operation completes normally."
+      (is (= {:type :ok :value 1} (select-keys (dispatch-operation op 
(test-fn)) [:type :value]))))
+    (testing "Returned operation should be of type :fail and have a nil value 
on exception."
+      (is (= {:type :fail :value nil :error "expected"} (select-keys 
(dispatch-operation op (throw (Exception. "expected"))) [:type :value 
:error]))))))
diff --git a/flink-jepsen/test/jepsen/flink/nemesis_test.clj 
b/flink-jepsen/test/jepsen/flink/nemesis_test.clj
new file mode 100644
index 00000000000..488631ffea7
--- /dev/null
+++ b/flink-jepsen/test/jepsen/flink/nemesis_test.clj
@@ -0,0 +1,32 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;;     http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+
+(ns jepsen.flink.nemesis-test
+  (:require [clojure.test :refer :all])
+  (:require [jepsen.flink.nemesis :refer :all]))
+
+(deftest inc-by-factor-test
+  (testing "Should not increase if factor is 1."
+    (is (= 10 (@#'jepsen.flink.nemesis/inc-by-factor 10 1))))
+
+  (testing "Should increase by factor."
+    (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.5))))
+
+  (testing "Should round down."
+    (is (= 15 (@#'jepsen.flink.nemesis/inc-by-factor 10 1.52))))
+
+  (testing "Should throw if factor < 1."
+    (is (thrown? AssertionError (@#'jepsen.flink.nemesis/inc-by-factor 1 0)))))


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> HA end-to-end/Jepsen tests for standby Dispatchers
> --------------------------------------------------
>
>                 Key: FLINK-10311
>                 URL: https://issues.apache.org/jira/browse/FLINK-10311
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Gary Yao
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> We should add end-to-end or Jepsen tests to verify the HA behaviour when 
> using multiple standby Dispatchers. In particular, we should verify that jobs 
> get properly cleaned up after they finished successfully (see FLINK-10255 and 
> FLINK-10011):
> 1. Test that a standby Dispatcher does not affect job execution and resource 
> cleanup
> 2. Test that a standby Dispatcher recovers all submitted jobs after the 
> leader loses leadership



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to