Remove all legacy dependencies on "controller" as the controller nodes' name.

Should check instance type for the instance information instead of relying on 
the name. Name can be configured as any string.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/5ffab62f
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/5ffab62f
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/5ffab62f

Branch: refs/heads/master
Commit: 5ffab62fc96007cb636a6da97be0e62ddb63cd91
Parents: 89089b4
Author: Jiajun Wang <[email protected]>
Authored: Tue Nov 14 00:38:44 2017 -0800
Committer: Junkai Xue <[email protected]>
Committed: Wed Jan 24 18:32:52 2018 -0800

----------------------------------------------------------------------
 .../DefaultSchedulerMessageHandlerFactory.java  |   4 +-
 .../messaging/DefaultMessagingService.java      |   4 +-
 .../handling/HelixStateTransitionHandler.java   |  21 ++-
 .../helix/messaging/handling/HelixTask.java     |  22 +--
 .../messaging/handling/HelixTaskExecutor.java   |  32 ++--
 .../java/org/apache/helix/model/Message.java    |   4 +-
 .../participant/HelixStateMachineEngine.java    |   2 +-
 .../org/apache/helix/util/StatusUpdateUtil.java | 149 ++++++++++++-------
 8 files changed, 141 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
index 51bb5c3..443df9f 100644
--- 
a/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
+++ 
b/helix-core/src/main/java/org/apache/helix/manager/zk/DefaultSchedulerMessageHandlerFactory.java
@@ -80,7 +80,7 @@ public class DefaultSchedulerMessageHandlerFactory implements 
MessageHandlerFact
           + _timeout + " Ms");
 
       _statusUpdateUtil.logError(_originalMessage, 
SchedulerAsyncCallback.class, "Task timeout",
-          _manager.getHelixDataAccessor());
+          _manager);
       addSummary(_resultSummaryMap, _originalMessage, _manager, true);
     }
 
@@ -94,7 +94,7 @@ public class DefaultSchedulerMessageHandlerFactory implements 
MessageHandlerFact
       if (this.isDone()) {
         _logger.info("Scheduler msg " + _originalMessage.getMsgId() + " 
completed");
         _statusUpdateUtil.logInfo(_originalMessage, 
SchedulerAsyncCallback.class,
-            "Scheduler task completed", _manager.getHelixDataAccessor());
+            "Scheduler task completed", _manager);
         addSummary(_resultSummaryMap, _originalMessage, _manager, false);
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index e776dc2..fb84fd7 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -204,7 +204,7 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
     Message newMessage = new Message(message.getRecord(), id);
     newMessage.setMsgId(id);
     newMessage.setSrcName(_manager.getInstanceName());
-    newMessage.setTgtName("Controller");
+    newMessage.setTgtName(InstanceType.CONTROLLER.name());
     messages.add(newMessage);
     return messages;
   }
@@ -298,7 +298,7 @@ public class DefaultMessagingService implements 
ClusterMessagingService {
 
       if (_manager.getInstanceType() == InstanceType.CONTROLLER
           || _manager.getInstanceType() == 
InstanceType.CONTROLLER_PARTICIPANT) {
-        nopMsg.setTgtName("Controller");
+        nopMsg.setTgtName(InstanceType.CONTROLLER.name());
         accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), 
nopMsg);
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
index ece7ac7..9412dde 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixStateTransitionHandler.java
@@ -89,7 +89,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
               + Arrays.toString(Message.Attributes.values());
 
       _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, 
errorMessage,
-          _manager.getHelixDataAccessor());
+          _manager);
       logger.error(errorMessage);
       throw new HelixException(errorMessage);
     }
@@ -119,7 +119,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
               + _message.getTgtName();
 
       _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, 
errorMessage,
-          accessor);
+          _manager);
       logger.error(errorMessage);
       throw new HelixStateMismatchException(errorMessage);
     }
@@ -162,7 +162,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
                                  HelixStateTransitionHandler.class,
                                  e,
                                  "Error when removing " + 
CurrentState.CurrentStateProperty.REQUESTED_STATE.name() +  " from current 
state.",
-                                 accessor);
+                                 _manager);
     }
   }
 
@@ -286,7 +286,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
           new StateTransitionError(ErrorType.FRAMEWORK, ErrorCode.ERROR, e);
       _stateModel.rollbackOnError(_message, _notificationContext, error);
       _statusUpdateUtil.logError(_message, HelixStateTransitionHandler.class, 
e,
-          "Error when update current-state ", accessor);
+          "Error when update current-state ", _manager);
     }
   }
 
@@ -311,15 +311,14 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
     synchronized (_stateModel) {
       HelixTaskResult taskResult = new HelixTaskResult();
       HelixManager manager = context.getManager();
-      HelixDataAccessor accessor = manager.getHelixDataAccessor();
 
       _statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
-          "Message handling task begin execute", accessor);
+          "Message handling task begin execute", manager);
       message.setExecuteStartTimeStamp(new Date().getTime());
 
       try {
         preHandleMessage();
-        invoke(accessor, context, taskResult, message);
+        invoke(manager, context, taskResult, message);
       } catch (HelixStateMismatchException e) {
         // Simply log error and return from here if State mismatch.
         // The current state of the state model is intact.
@@ -344,7 +343,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
           taskResult.setCancelled(true);
         } else {
           _statusUpdateUtil
-              .logError(message, HelixStateTransitionHandler.class, e, 
errorMessage, accessor);
+              .logError(message, HelixStateTransitionHandler.class, e, 
errorMessage, manager);
           taskResult.setSuccess(false);
           taskResult.setMessage(e.toString());
           taskResult.setException(e);
@@ -361,11 +360,11 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
     }
   }
 
-  private void invoke(HelixDataAccessor accessor, NotificationContext context,
+  private void invoke(HelixManager manager, NotificationContext context,
       HelixTaskResult taskResult, Message message) throws 
IllegalAccessException,
       InvocationTargetException, InterruptedException, HelixRollbackException {
     _statusUpdateUtil.logInfo(message, HelixStateTransitionHandler.class,
-        "Message handling invoking", accessor);
+        "Message handling invoking", manager);
 
     // by default, we invoke state transition function in state model
     Method methodToInvoke = null;
@@ -417,7 +416,7 @@ public class HelixStateTransitionHandler extends 
MessageHandler {
       taskResult.setSuccess(false);
 
       _statusUpdateUtil
-          .logError(message, HelixStateTransitionHandler.class, errorMessage, 
accessor);
+          .logError(message, HelixStateTransitionHandler.class, errorMessage, 
manager);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 2543d81..85665c1 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -79,7 +79,7 @@ public class HelixTask implements MessageTask {
     logger.info("handling task: " + getTaskId() + " begin, at: " + start);
     HelixDataAccessor accessor = _manager.getHelixDataAccessor();
     _statusUpdateUtil.logInfo(_message, HelixTask.class, "Message handling 
task begin execute",
-        accessor);
+        _manager);
     _message.setExecuteStartTimeStamp(new Date().getTime());
 
     // add a concurrent map to hold currentStateUpdates for sub-messages of a 
batch-message
@@ -99,7 +99,7 @@ public class HelixTask implements MessageTask {
       taskResult.setInterrupted(true);
 
       _statusUpdateUtil.logError(_message, HelixTask.class, e,
-          "State transition interrupted, timeout:" + _isTimeout, accessor);
+          "State transition interrupted, timeout:" + _isTimeout, _manager);
       logger.info("Message " + _message.getMsgId() + " is interrupted");
     } catch (Exception e) {
       taskResult = new HelixTaskResult();
@@ -110,7 +110,7 @@ public class HelixTask implements MessageTask {
           "Exception while executing a message. " + e + " msgId: " + 
_message.getMsgId()
               + " type: " + _message.getMsgType();
       logger.error(errorMessage, e);
-      _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, 
accessor);
+      _statusUpdateUtil.logError(_message, HelixTask.class, e, errorMessage, 
_manager);
     }
 
     // cancel timeout task
@@ -120,7 +120,7 @@ public class HelixTask implements MessageTask {
     try {
       if (taskResult.isSuccess()) {
         _statusUpdateUtil
-            .logInfo(_message, _handler.getClass(), "Message handling task 
completed successfully", accessor);
+            .logInfo(_message, _handler.getClass(), "Message handling task 
completed successfully", _manager);
         logger.info("Message " + _message.getMsgId() + " completed.");
         _executor.getParticipantMonitor().reportProcessedMessage(_message, 
ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
       } else {
@@ -134,7 +134,7 @@ public class HelixTask implements MessageTask {
             logger.info("Message timeout, retry count: " + retryCount + " 
msgId:"
                 + _message.getMsgId());
             _statusUpdateUtil.logInfo(_message, _handler.getClass(),
-                "Message handling task timeout, retryCount:" + retryCount, 
accessor);
+                "Message handling task timeout, retryCount:" + retryCount, 
_manager);
             // Notify the handler that timeout happens, and the number of 
retries left
             // In case timeout happens (time out and also interrupted)
             // we should retry the execution of the message by re-schedule it 
in
@@ -151,7 +151,7 @@ public class HelixTask implements MessageTask {
           type = null;
           _statusUpdateUtil
               .logInfo(_message, _handler.getClass(), "Cancellation completed 
successfully",
-                  accessor);
+                  _manager);
           _executor.getParticipantMonitor().reportProcessedMessage(
               _message, 
ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
         } else {// logging for errors
@@ -160,7 +160,7 @@ public class HelixTask implements MessageTask {
               "Message execution failed. msgId: " + getTaskId() + ", errorMsg: 
"
                   + taskResult.getMessage();
           logger.error(errorMsg);
-          _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, 
accessor);
+          _statusUpdateUtil.logError(_message, _handler.getClass(), errorMsg, 
_manager);
           _executor.getParticipantMonitor().reportProcessedMessage(
               _message, 
ParticipantMessageMonitor.ProcessedMessageState.FAILED);
         }
@@ -185,7 +185,7 @@ public class HelixTask implements MessageTask {
       String errorMessage =
           "Exception after executing a message, msgId: " + _message.getMsgId() 
+ e;
       logger.error(errorMessage, e);
-      _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, 
accessor);
+      _statusUpdateUtil.logError(_message, HelixTask.class, errorMessage, 
_manager);
     } finally {
       long end = System.currentTimeMillis();
       logger.info("msg: " + _message.getMsgId() + " handling task completed, 
results:"
@@ -206,7 +206,7 @@ public class HelixTask implements MessageTask {
   private void removeMessageFromZk(HelixDataAccessor accessor, Message 
message) {
     Builder keyBuilder = accessor.keyBuilder();
     PropertyKey msgKey;
-    if (message.getTgtName().equalsIgnoreCase("controller")) {
+    if (message.getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name())) 
{
       msgKey = keyBuilder.controllerMessage(message.getMsgId());
     } else {
       msgKey = keyBuilder.message(_manager.getInstanceName(), 
message.getMsgId());
@@ -259,7 +259,7 @@ public class HelixTask implements MessageTask {
     if (_message.getCorrelationId() != null
         && !message.getMsgType().equals(MessageType.TASK_REPLY.name())) {
       logger.info("Sending reply for message " + message.getCorrelationId());
-      _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", 
accessor);
+      _statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", 
_manager);
 
       taskResult.getTaskResultMap().put("SUCCESS", "" + 
taskResult.isSuccess());
       taskResult.getTaskResultMap().put("INTERRUPTED", "" + 
taskResult.isInterrupted());
@@ -280,7 +280,7 @@ public class HelixTask implements MessageTask {
         
accessor.setProperty(keyBuilder.controllerMessage(replyMessage.getMsgId()), 
replyMessage);
       }
       _statusUpdateUtil.logInfo(message, HelixTask.class,
-          "1 msg replied to " + replyMessage.getTgtName(), accessor);
+          "1 msg replied to " + replyMessage.getTgtName(), _manager);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index c0be583..064b6fd 100644
--- 
a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ 
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -398,10 +398,11 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
     String taskId = task.getTaskId();
     Message message = task.getMessage();
     NotificationContext notificationContext = task.getNotificationContext();
+    HelixManager manager = notificationContext.getManager();
 
     try {
       // Check to see if dedicate thread pool for handling state transition 
messages is configured or provided.
-      updateStateTransitionMessageThreadPool(message, 
notificationContext.getManager());
+      updateStateTransitionMessageThreadPool(message, manager);
 
       LOG.info("Scheduling message: " + taskId);
       // System.out.println("sched msg: " + message.getPartitionName() + "-"
@@ -409,8 +410,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
       // + message.getToState());
 
       _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
-          "Message handling task scheduled", notificationContext.getManager()
-              .getHelixDataAccessor());
+          "Message handling task scheduled", manager);
 
       // this sync guarantees that ExecutorService.submit() task and put 
taskInfo into map are
       // sync'ed
@@ -441,23 +441,19 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
           } else {
             LOG.debug("Message does not have timeout. MsgId: " + 
task.getTaskId());
           }
-
           _taskMap.put(taskId, new MessageTaskInfo(task, future, timerTask));
 
           LOG.info("Message: " + taskId + " handling task scheduled");
-
           return true;
         } else {
           _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
-              "Message handling task already sheduled for " + taskId, 
notificationContext
-                  .getManager().getHelixDataAccessor());
+              "Message handling task already sheduled for " + taskId, manager);
         }
       }
     } catch (Exception e) {
       LOG.error("Error while executing task. " + message, e);
-
       _statusUpdateUtil.logError(message, HelixTaskExecutor.class, e, "Error 
while executing task "
-          + e, notificationContext.getManager().getHelixDataAccessor());
+          + e, manager);
     }
     return false;
   }
@@ -480,25 +476,25 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
         Future<HelixTaskResult> future = taskInfo.getFuture();
         removeMessageFromTaskAndFutureMap(message);
         _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "Canceling 
task: " + taskId,
-            notificationContext.getManager().getHelixDataAccessor());
+            notificationContext.getManager());
 
         // If the thread is still running it will be interrupted if 
cancel(true)
         // is called. So state transition callbacks should implement logic to
         // return if it is interrupted.
         if (future.cancel(true)) {
           _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, 
"Canceled task: " + taskId,
-              notificationContext.getManager().getHelixDataAccessor());
+              notificationContext.getManager());
           _taskMap.remove(taskId);
           return true;
         } else {
           _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
               "fail to cancel task: " + taskId,
-              notificationContext.getManager().getHelixDataAccessor());
+              notificationContext.getManager());
         }
       } else {
         _statusUpdateUtil.logWarning(message, HelixTaskExecutor.class,
             "fail to cancel task: " + taskId + ", future not found",
-            notificationContext.getManager().getHelixDataAccessor());
+            notificationContext.getManager());
       }
     }
     return false;
@@ -807,7 +803,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
                 + message.getMsgId();
         LOG.warn(warningMessage);
         reportAndRemoveMessage(message, accessor, instanceName, 
ProcessedMessageState.DISCARDED);
-        _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, 
warningMessage, accessor);
+        _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, 
warningMessage, manager);
 
         // Proactively send a session sync message from participant to 
controller
         // upon session mismatch after a new session is established
@@ -873,7 +869,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         String error =
             "Failed to create message handler for " + message.getMsgId() + ", 
exception: " + e;
 
-        _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, 
error, accessor);
+        _statusUpdateUtil.logError(message, HelixStateMachineEngine.class, e, 
error, manager);
 
         message.setMsgState(MessageState.UNPROCESSABLE);
         removeMessageFromZK(accessor, message, instanceName);
@@ -882,7 +878,7 @@ public class HelixTaskExecutor implements MessageListener, 
TaskExecutor {
         continue;
       }
 
-      markReadMessage(message, changeContext, accessor);
+      markReadMessage(message, changeContext, manager);
       readMsgs.add(message);
 
       // batch creation of all current state meta data
@@ -1003,12 +999,12 @@ public class HelixTaskExecutor implements 
MessageListener, TaskExecutor {
   }
 
   private void markReadMessage(Message message, NotificationContext context,
-      HelixDataAccessor accessor) {
+      HelixManager manager) {
     message.setMsgState(MessageState.READ);
     message.setReadTimeStamp(new Date().getTime());
     message.setExecuteSessionId(context.getManager().getSessionId());
 
-    _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New 
Message", accessor);
+    _statusUpdateUtil.logInfo(message, HelixStateMachineEngine.class, "New 
Message", manager);
   }
 
   public MessageHandler createMessageHandler(Message message, 
NotificationContext changeContext) {

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java 
b/helix-core/src/main/java/org/apache/helix/model/Message.java
index d987c54..9f0054c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -644,7 +644,7 @@ public class Message extends HelixProperty {
     replyMessage.setMsgState(MessageState.NEW);
     replyMessage.setSrcName(instanceName);
     if (srcMessage.getSrcInstanceType() == InstanceType.CONTROLLER) {
-      replyMessage.setTgtName("Controller");
+      replyMessage.setTgtName(InstanceType.CONTROLLER.name());
     } else {
       replyMessage.setTgtName(srcMessage.getMsgSrc());
     }
@@ -849,7 +849,7 @@ public class Message extends HelixProperty {
    * @return true if this is a controller message, false otherwise
    */
   public boolean isControlerMsg() {
-    return getTgtName().equalsIgnoreCase("controller");
+    return getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
index cd424f8..3a13180 100644
--- 
a/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
+++ 
b/helix-core/src/main/java/org/apache/helix/participant/HelixStateMachineEngine.java
@@ -127,7 +127,7 @@ public class HelixStateMachineEngine implements 
StateMachineEngine {
 
         if (_manager.getInstanceType() == InstanceType.CONTROLLER
             || _manager.getInstanceType() == 
InstanceType.CONTROLLER_PARTICIPANT) {
-          nopMsg.setTgtName("Controller");
+          nopMsg.setTgtName(InstanceType.CONTROLLER.name());
           accessor.setProperty(keyBuilder.controllerMessage(nopMsg.getId()), 
nopMsg);
         }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/5ffab62f/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
index 3eb20ef..90e0d24 100644
--- a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java
@@ -32,13 +32,17 @@ import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.helix.*;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.model.Error;
 import org.apache.helix.model.Message;
-import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StatusUpdate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -310,6 +314,18 @@ public class StatusUpdateUtil {
     return String.format("%4s %26s ", level.toString(), time) + recordId;
   }
 
+  @Deprecated
+  public void logMessageStatusUpdateRecord(Message message, Level level, Class 
classInfo,
+      String additionalInfo, HelixDataAccessor accessor) {
+    try {
+      ZNRecord record = createMessageStatusUpdateRecord(message, level, 
classInfo, additionalInfo);
+      publishStatusUpdateRecord(record, message, level, accessor,
+          
message.getTgtName().equalsIgnoreCase(InstanceType.CONTROLLER.name()));
+    } catch (Exception e) {
+      _logger.error("Exception while logging status update", e);
+    }
+  }
+
   /**
    * Create a statusupdate that is related to a cluster manager message, then 
record it to
    * the zookeeper store.
@@ -321,14 +337,16 @@ public class StatusUpdateUtil {
    *          class info about the class that reports the status update
    * @param additionalInfo
    *          info the additional debug information
-   * @param accessor
-   *          the zookeeper data accessor that writes the status update to 
zookeeper
+   * @param manager
+   *          the HelixManager that writes the status update to zookeeper
    */
   public void logMessageStatusUpdateRecord(Message message, Level level, Class 
classInfo,
-      String additionalInfo, HelixDataAccessor accessor) {
+      String additionalInfo, HelixManager manager) {
     try {
       ZNRecord record = createMessageStatusUpdateRecord(message, level, 
classInfo, additionalInfo);
-      publishStatusUpdateRecord(record, message, level, accessor);
+      publishStatusUpdateRecord(record, message, level, 
manager.getHelixDataAccessor(),
+          manager.getInstanceType().equals(InstanceType.CONTROLLER) || 
manager.getInstanceType()
+              .equals(InstanceType.CONTROLLER_PARTICIPANT));
     } catch (Exception e) {
       _logger.error("Exception while logging status update", e);
     }
@@ -338,39 +356,67 @@ public class StatusUpdateUtil {
     RebalanceResourceFailure,
   }
 
-  public void logError(ErrorType errorType, Class classInfo, String 
additionalInfo,
-      HelixManager helixManager) {
+  public void logError(ErrorType errorType, Class classInfo, String 
additionalInfo, HelixManager helixManager) {
     if (helixManager != null) {
-      logError(errorType, "ErrorInfo", helixManager.getInstanceName(), 
helixManager.getSessionId(),
-          additionalInfo, classInfo, helixManager.getHelixDataAccessor());
+      logError(errorType, "ErrorInfo", helixManager.getInstanceName(), 
helixManager.getSessionId(), additionalInfo,
+          classInfo, helixManager.getHelixDataAccessor(),
+          helixManager.getInstanceType().equals(InstanceType.CONTROLLER) || 
helixManager.getInstanceType()
+              .equals(InstanceType.CONTROLLER_PARTICIPANT));
     } else {
       _logger.error("Exception while logging error. HelixManager is null.");
     }
   }
 
   private void logError(ErrorType errorType, String updateKey, String 
instanceName,
-      String sessionId, String additionalInfo, Class classInfo, 
HelixDataAccessor accessor) {
+      String sessionId, String additionalInfo, Class classInfo, 
HelixDataAccessor accessor,
+      boolean isController) {
     try {
       ZNRecord record = createEmptyStatusUpdateRecord(sessionId + "__" + 
instanceName);
 
-      Map<String, String> contentMap = new TreeMap<String, String>();
+      Map<String, String> contentMap = new TreeMap<>();
       contentMap.put("AdditionalInfo", additionalInfo);
       contentMap.put("Class", classInfo.toString());
       contentMap.put("SessionId", sessionId);
 
       record.setMapField(generateMapFieldId(Level.HELIX_ERROR, updateKey), 
contentMap);
 
-      publishErrorRecord(record, instanceName, errorType.name(), updateKey, 
sessionId, accessor);
+      publishErrorRecord(record, instanceName, errorType.name(), updateKey, 
sessionId, accessor,
+          isController);
     } catch (Exception e) {
       _logger.error("Exception while logging error", e);
     }
   }
 
+  public void logError(Message message, Class classInfo, String 
additionalInfo, HelixManager manager) {
+    logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, 
additionalInfo, manager);
+  }
+
+  public void logError(Message message, Class classInfo, Exception e, String 
additionalInfo,
+      HelixManager manager) {
+    StringWriter sw = new StringWriter();
+    PrintWriter pw = new PrintWriter(sw);
+    e.printStackTrace(pw);
+    logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo,
+        additionalInfo + sw.toString(), manager);
+  }
+
+  public void logInfo(Message message, Class classInfo, String additionalInfo,
+      HelixManager manager) {
+    logMessageStatusUpdateRecord(message, Level.HELIX_INFO, classInfo, 
additionalInfo, manager);
+  }
+
+  public void logWarning(Message message, Class classInfo, String 
additionalInfo,
+      HelixManager manager) {
+    logMessageStatusUpdateRecord(message, Level.HELIX_WARNING, classInfo, 
additionalInfo, manager);
+  }
+
+  @Deprecated
   public void logError(Message message, Class classInfo, String additionalInfo,
       HelixDataAccessor accessor) {
     logMessageStatusUpdateRecord(message, Level.HELIX_ERROR, classInfo, 
additionalInfo, accessor);
   }
 
+  @Deprecated
   public void logError(Message message, Class classInfo, Exception e, String 
additionalInfo,
       HelixDataAccessor accessor) {
     StringWriter sw = new StringWriter();
@@ -380,16 +426,42 @@ public class StatusUpdateUtil {
         additionalInfo + sw.toString(), accessor);
   }
 
+  @Deprecated
   public void logInfo(Message message, Class classInfo, String additionalInfo,
       HelixDataAccessor accessor) {
     logMessageStatusUpdateRecord(message, Level.HELIX_INFO, classInfo, 
additionalInfo, accessor);
   }
 
+  @Deprecated
   public void logWarning(Message message, Class classInfo, String 
additionalInfo,
       HelixDataAccessor accessor) {
     logMessageStatusUpdateRecord(message, Level.HELIX_WARNING, classInfo, 
additionalInfo, accessor);
   }
 
+  private String getStatusUpdateKey(Message message) {
+    if 
(message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
+      return message.getPartitionName();
+    }
+    return message.getMsgId();
+  }
+
+  /**
+   * Generate the sub-path under STATUSUPDATE or ERROR path for a status update
+   */
+  String getStatusUpdateSubPath(Message message) {
+    if 
(message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
+      return message.getResourceName();
+    }
+    return message.getMsgType();
+  }
+
+  String getStatusUpdateRecordName(Message message) {
+    if 
(message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
+      return message.getTgtSessionId() + "__" + message.getResourceName();
+    }
+    return message.getMsgId();
+  }
+
   /**
    * Write a status update record to zookeeper to the zookeeper store.
    * @param record
@@ -400,9 +472,11 @@ public class StatusUpdateUtil {
    *          the error level of the message update
    * @param accessor
    *          the zookeeper data accessor that writes the status update to 
zookeeper
+   * @param isController
+   *          if the update is for a controller instance or not
    */
   void publishStatusUpdateRecord(ZNRecord record, Message message, Level level,
-      HelixDataAccessor accessor) {
+      HelixDataAccessor accessor, boolean isController) {
     String instanceName = message.getTgtName();
     String statusUpdateSubPath = getStatusUpdateSubPath(message);
     String statusUpdateKey = getStatusUpdateKey(message);
@@ -416,11 +490,10 @@ public class StatusUpdateUtil {
 
     Builder keyBuilder = accessor.keyBuilder();
     if (!_recordedMessages.containsKey(message.getMsgId())) {
-      // TODO instanceName of a controller might be any string
-      if (instanceName.equalsIgnoreCase("Controller")) {
-        accessor.updateProperty(
-            keyBuilder.controllerTaskStatus(statusUpdateSubPath, 
statusUpdateKey),
-            new StatusUpdate(createMessageLogRecord(message)));
+      if (isController) {
+        accessor
+            
.updateProperty(keyBuilder.controllerTaskStatus(statusUpdateSubPath, 
statusUpdateKey),
+                new StatusUpdate(createMessageLogRecord(message)));
 
       } else {
 
@@ -442,7 +515,7 @@ public class StatusUpdateUtil {
       _recordedMessages.put(message.getMsgId(), message.getMsgId());
     }
 
-    if (instanceName.equalsIgnoreCase("Controller")) {
+    if (isController) {
       accessor.updateProperty(
           keyBuilder.controllerTaskStatus(statusUpdateSubPath, 
statusUpdateKey), new StatusUpdate(
               record));
@@ -462,35 +535,10 @@ public class StatusUpdateUtil {
     // If the error level is ERROR, also write the record to "ERROR" ZNode
     if (Level.HELIX_ERROR == level) {
       publishErrorRecord(record, instanceName, statusUpdateSubPath, 
statusUpdateKey, sessionId,
-          accessor);
+          accessor, isController);
     }
   }
 
-  private String getStatusUpdateKey(Message message) {
-    if 
(message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
-      return message.getPartitionName();
-    }
-    return message.getMsgId();
-  }
-
-  /**
-   * Generate the sub-path under STATUSUPDATE or ERROR path for a status update
-   */
-  String getStatusUpdateSubPath(Message message) {
-    if 
(message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
-      return message.getResourceName();
-    } else {
-      return message.getMsgType();
-    }
-  }
-
-  String getStatusUpdateRecordName(Message message) {
-    if 
(message.getMsgType().equalsIgnoreCase(MessageType.STATE_TRANSITION.name())) {
-      return message.getTgtSessionId() + "__" + message.getResourceName();
-    }
-    return message.getMsgId();
-  }
-
   /**
    * Write an error record to zookeeper to the zookeeper store.
    * @param record
@@ -505,12 +553,13 @@ public class StatusUpdateUtil {
    *          the session id
    * @param accessor
    *          the zookeeper data accessor that writes the status update to 
zookeeper
+   * @param isController
+   *          if the error log is for a controller instance or not
    */
   void publishErrorRecord(ZNRecord record, String instanceName, String 
updateSubPath,
-      String updateKey, String sessionId, HelixDataAccessor accessor) {
+      String updateKey, String sessionId, HelixDataAccessor accessor, boolean 
isController) {
     Builder keyBuilder = accessor.keyBuilder();
-    // TODO remove the hard code: "controller"
-    if (instanceName.toLowerCase().startsWith("controller")) {
+    if (isController) {
       // TODO need to fix: ERRORS_CONTROLLER doesn't have a form of
       // ../{sessionId}/{subPath}
       accessor.setProperty(keyBuilder.controllerTaskError(updateSubPath), new 
Error(record));

Reply via email to