Repository: storm Updated Branches: refs/heads/1.x-branch b511a8b4f -> 3a4825ed9
[STORM-1696]-1.x-branch status not sync if zk fails in backpressure Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9271056b Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9271056b Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9271056b Branch: refs/heads/1.x-branch Commit: 9271056b22ab5c734157a9ca1f3f4ab9a28d4b4b Parents: 6af0d10 Author: zhuol <[email protected]> Authored: Thu Apr 7 18:12:33 2016 -0500 Committer: zhuol <[email protected]> Committed: Thu Apr 7 18:12:33 2016 -0500 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/worker.clj | 19 +++++++++++-------- .../org/apache/storm/utils/DisruptorQueue.java | 8 ++++++-- 2 files changed, 17 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9271056b/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 778e83d..b8bc423 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -138,16 +138,19 @@ assignment-id (:assignment-id worker) port (:port worker) storm-cluster-state (:storm-cluster-state worker) - prev-backpressure-flag @(:backpressure worker)] - (when executors - (reset! (:backpressure worker) - (or @(:transfer-backpressure worker) - (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))))) + prev-backpressure-flag @(:backpressure worker) + ;; the backpressure flag is true if at least one of the disruptor queues has throttle-on + curr-backpressure-flag (if executors + (or (.getThrottleOn (:transfer-queue worker)) + (reduce #(or %1 %2) (map #(.get-backpressure-flag %1) executors))) + prev-backpressure-flag)] ;; update the worker's backpressure flag to zookeeper only when it has changed - (log-debug "BP " @(:backpressure worker) " WAS " prev-backpressure-flag) - (when (not= prev-backpressure-flag @(:backpressure worker)) + (when (not= prev-backpressure-flag curr-backpressure-flag) (try - (.worker-backpressure! storm-cluster-state storm-id assignment-id port @(:backpressure worker)) + (log-debug "worker backpressure flag changing from " prev-backpressure-flag " to " curr-backpressure-flag) + (.worker-backpressure! storm-cluster-state storm-id assignment-id port curr-backpressure-flag) + ;; doing the local reset after the zk update succeeds is very important to avoid a bad state upon zk exception + (reset! (:backpressure worker) curr-backpressure-flag) (catch Exception exc (log-error exc "workerBackpressure update failed when connecting to ZK ... will retry")))) )))) http://git-wip-us.apache.org/repos/asf/storm/blob/9271056b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 19aba06..9f39d06 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -146,8 +146,8 @@ public class DisruptorQueue implements IStatefulObject { if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) { try { if (!_throttleOn) { - _cb.highWaterMark(); _throttleOn = true; + _cb.highWaterMark(); } } catch (Exception e) { throw new RuntimeException("Exception during calling highWaterMark callback!", e); @@ -200,8 +200,8 @@ public class DisruptorQueue implements IStatefulObject { if (_enableBackpressure && _cb != null && (_metrics.population() + _overflowCount.get()) >= _highWaterMark) { try { if (!_throttleOn) { - _cb.highWaterMark(); _throttleOn = true; + _cb.highWaterMark(); } } catch (Exception e) { throw new RuntimeException("Exception during calling highWaterMark callback!", e); @@ -537,6 +537,10 @@ public class DisruptorQueue implements IStatefulObject { return this; } + public boolean getThrottleOn() { + return _throttleOn; + } + //This method enables the metrics to be accessed from outside of the DisruptorQueue class public QueueMetrics getMetrics() { return _metrics;
