Ethanlm commented on a change in pull request #3297:
URL: https://github.com/apache/storm/pull/3297#discussion_r447054327



##########
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
##########
@@ -215,8 +215,11 @@ private Object loadWorker(IStateStorage stateStorage, 
IStormClusterState stormCl
                 }
             });
 
+        Integer execHeartBeatFreqSecs = 
workerState.stormClusterState.isPacemakerStateStore()
+            ? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)

Review comment:
       We might want to update the comment here 
https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/Config.java#L1678-L1684
   since this config is no longer deprecated. and it is used when Pacemaker is 
in use.

##########
File path: storm-client/src/jvm/org/apache/storm/cluster/IStormClusterState.java
##########
@@ -74,6 +74,13 @@
      */
     boolean isAssignmentsBackendSynchronized();
 
+    /**
+     * Flag to indicate if the Pacameker is backup store.

Review comment:
       Do you mean `backend store`? 

##########
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
##########
@@ -361,7 +364,11 @@ public void doHeartBeat() throws IOException {
         state.setWorkerHeartBeat(lsWorkerHeartbeat);
         state.cleanup(60); // this is just in case supervisor is down so that 
disk doesn't fill up.
         // it shouldn't take supervisor 120 seconds between listing dir and 
reading it
-        heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+
+        if (!workerState.stormClusterState.isPacemakerStateStore()) {
+            LOG.debug("If pacemaker is not used, send supervisor");

Review comment:
       We should delete `If`. Since if this message is logged, it means 
pacemaker is not being used.

##########
File path: storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
##########
@@ -377,15 +378,31 @@ public StormTimer getUserTimer() {
     public SmartThread makeTransferThread() {
         return workerTransfer.makeTransferThread();
     }
-    
+
+    public void suicideIfLocalAssignmentsChanged(Assignment assignment) {
+        if (assignment != null) {
+            Set<List<Long>> assignedExecutors = new 
HashSet<>(readWorkerExecutors(assignmentId, port, assignment));
+            if (!localExecutors.equals(assignedExecutors)) {
+                LOG.info("Found conflicting assignments. We shouldn't be 
alive!"
+                         + " Assigned: " + assignedExecutors + ", Current: "
+                         + localExecutors);
+                if (!ConfigUtils.isLocalMode(conf)) {
+                    suicideCallback.run();
+                } else {
+                    LOG.info("Local worker tried to commit suicide!");
+                }
+            }
+        }
+    }
+
     public void refreshConnections() {
         Assignment assignment = null;
         try {
             assignment = getLocalAssignment(stormClusterState, topologyId);
         } catch (Exception e) {
             LOG.warn("Failed to read assignment. This should only happen when 
topology is shutting down.", e);
         }
-
+        suicideIfLocalAssignmentsChanged(assignment);

Review comment:
       We can remove this from this PR since it is being addressed in your 
other PR https://github.com/apache/storm/pull/3291




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to