AMBARI-17716. AMS HA not working in secure cluster. (swagle)

Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/18cfea6f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/18cfea6f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/18cfea6f

Branch: refs/heads/trunk
Commit: 18cfea6f869fccd7d029640265baeae63b10a51a
Parents: 885b7ae
Author: Siddharth Wagle <swa...@hortonworks.com>
Authored: Thu Jul 14 13:38:19 2016 -0700
Committer: Siddharth Wagle <swa...@hortonworks.com>
Committed: Thu Jul 14 16:38:34 2016 -0700

----------------------------------------------------------------------
 .../ambari-metrics-timelineservice/pom.xml      |  2 +-
 .../timeline/TimelineMetricConfiguration.java   |  4 ++
 .../availability/AggregationTaskRunner.java     | 12 ++---
 .../MetricCollectorHAController.java            | 47 ++++++++++++--------
 .../OnlineOfflineStateModelFactory.java         | 15 ++++---
 .../MetricCollectorHAControllerTest.java        | 11 +++--
 6 files changed, 54 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml 
b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
index 7ae6155..9c9a303 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml
@@ -253,7 +253,7 @@
     <dependency>
       <groupId>org.apache.helix</groupId>
       <artifactId>helix-core</artifactId>
-      <version>0.6.5</version>
+      <version>0.7.1</version>
       <exclusions>
         <exclusion>
           <artifactId>zookeeper</artifactId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 6ac5f45..e82d65a 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -399,4 +399,8 @@ public class TimelineMetricConfiguration {
       return false;
     }
   }
+
+  public boolean isSecurityEnabled() {
+    return hbaseConf.get("hbase.security.authentication", 
"").equals("kerberos");
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
index cad2a2b..ceab754 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
@@ -23,6 +23,7 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.participant.StateMachineEngine;
 
 import java.util.HashMap;
@@ -38,13 +39,13 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.DEFAULT_STATE_MODEL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME;
 
 public class AggregationTaskRunner {
   private final String instanceName;
   private final String zkAddress;
+  private final String clusterName;
   private HelixManager manager;
   private static final Log LOG = 
LogFactory.getLog(AggregationTaskRunner.class);
   private CheckpointManager checkpointManager;
@@ -80,20 +81,21 @@ public class AggregationTaskRunner {
     PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST);
   }
 
-  public AggregationTaskRunner(String instanceName, String zkAddress) {
+  public AggregationTaskRunner(String instanceName, String zkAddress, String 
clusterName) {
     this.instanceName = instanceName;
     this.zkAddress = zkAddress;
+    this.clusterName = clusterName;
   }
 
   public void initialize() throws Exception {
-    manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName,
+    manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
       InstanceType.PARTICIPANT, zkAddress);
 
     OnlineOfflineStateModelFactory stateModelFactory =
       new OnlineOfflineStateModelFactory(instanceName, this);
 
     StateMachineEngine stateMach = manager.getStateMachineEngine();
-    stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
+    
stateMach.registerStateModelFactory(StateModelDefId.from(DEFAULT_STATE_MODEL), 
stateModelFactory);
     manager.connect();
 
     checkpointManager = new CheckpointManager(manager.getHelixPropertyStore());

http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
index 18b9059..84e4153 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
@@ -33,8 +33,8 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.OnlineOfflineSMD;
-
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -42,21 +42,21 @@ import java.util.TreeSet;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
 
 public class MetricCollectorHAController {
   private static final Log LOG = 
LogFactory.getLog(MetricCollectorHAController.class);
 
-  static final String CLUSTER_NAME = "ambari-metrics-cluster";
+  static final String CLUSTER_NAME = "ambari-metrics-cluster-unsecure";
   static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
-  static final String STATE_MODEL_NAME = OnlineOfflineSMD.name;
+  static final String DEFAULT_STATE_MODEL = "OnlineOffline";
   static final String INSTANCE_NAME_DELIMITER = "_";
 
   final String zkConnectUrl;
   final String instanceHostname;
   final InstanceConfig instanceConfig;
   final AggregationTaskRunner aggregationTaskRunner;
+  final TimelineMetricConfiguration configuration;
 
   // Cache list of known live instances
   final List<String> liveInstanceNames = new ArrayList<>();
@@ -69,6 +69,7 @@ public class MetricCollectorHAController {
   private volatile boolean isInitialized = false;
 
   public MetricCollectorHAController(TimelineMetricConfiguration 
configuration) {
+    this.configuration = configuration;
     String instancePort;
     try {
       instanceHostname = configuration.getInstanceHostnameFromEnv();
@@ -99,43 +100,53 @@ public class MetricCollectorHAController {
     instanceConfig.setHostName(instanceHostname);
     instanceConfig.setPort(instancePort);
     instanceConfig.setInstanceEnabled(true);
-    aggregationTaskRunner = new 
AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl);
+    aggregationTaskRunner = new AggregationTaskRunner(
+      instanceConfig.getInstanceName(), zkConnectUrl, getClusterName());
+  }
+
+  /**
+   * Name of Helix znode
+   */
+  public String getClusterName() {
+    return CLUSTER_NAME;
   }
 
   /**
    * Initialize the instance with zookeeper via Helix
    */
   public void initializeHAController() throws Exception {
+    String clusterName = getClusterName();
     admin = new ZKHelixAdmin(zkConnectUrl);
     // create cluster
-    LOG.info("Creating zookeeper cluster node: " + CLUSTER_NAME);
-    admin.addCluster(CLUSTER_NAME, false);
+    LOG.info("Creating zookeeper cluster node: " + clusterName);
+    admin.addCluster(clusterName, false);
 
     // Adding host to the cluster
-    List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+    List<String> nodes = admin.getInstancesInCluster(clusterName);
     if (nodes == null || !nodes.contains(instanceConfig.getInstanceName())) {
       LOG.info("Adding participant instance " + instanceConfig);
-      admin.addInstance(CLUSTER_NAME, instanceConfig);
+      admin.addInstance(clusterName, instanceConfig);
     }
 
     // Add a state model
-    if (admin.getStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME) == null) {
+    if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) {
       LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
-      admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, 
OnlineOfflineSMD.build());
+      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new 
StateModelDefinition(
+        StateModelConfigGenerator.generateConfigForOnlineOffline()));
     }
 
     // Add resources with 1 cluster-wide replica
     // Since our aggregators are unbalanced in terms of work distribution we
     // only need to distribute writes to METRIC_AGGREGATE and
     // METRIC_RECORD_MINUTE
-    List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
+    List<String> resources = admin.getResourcesInCluster(clusterName);
     if (!resources.contains(METRIC_AGGREGATORS)) {
       LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions 
and 1 replicas");
-      admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, 
OnlineOfflineSMD.name, FULL_AUTO.toString());
+      admin.addResource(clusterName, METRIC_AGGREGATORS, 2, 
DEFAULT_STATE_MODEL, FULL_AUTO.toString());
     }
     // this will set up the ideal state, it calculates the preference list for
     // each partition similar to consistent hashing
-    admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+    admin.rebalance(clusterName, METRIC_AGGREGATORS, 1);
 
     // Start participant
     startAggregators();
@@ -173,7 +184,7 @@ public class MetricCollectorHAController {
 
   private void startController() throws Exception {
     manager = HelixManagerFactory.getZKHelixManager(
-      CLUSTER_NAME,
+      getClusterName(),
       instanceHostname,
       InstanceType.CONTROLLER,
       zkConnectUrl
@@ -245,7 +256,7 @@ public class MetricCollectorHAController {
   public void printClusterState() {
     StringBuilder sb = new StringBuilder("\n######################### Cluster 
HA state ########################");
 
-    ExternalView resourceExternalView = 
admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+    ExternalView resourceExternalView = 
admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS);
     if (resourceExternalView != null) {
       getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
     }
@@ -258,7 +269,7 @@ public class MetricCollectorHAController {
                                          StringBuilder sb) {
     TreeSet<String> sortedSet = new 
TreeSet<>(resourceExternalView.getPartitionSet());
     sb.append("\nCLUSTER: ");
-    sb.append(CLUSTER_NAME);
+    sb.append(getClusterName());
     sb.append("\nRESOURCE: ");
     sb.append(resourceName);
     for (String partitionName : sortedSet) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
index 7d3350b..eb63327 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java
@@ -21,13 +21,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.StateTransitionHandlerFactory;
+import org.apache.helix.api.TransitionHandler;
+import org.apache.helix.api.id.PartitionId;
 import org.apache.helix.model.Message;
-import org.apache.helix.participant.statemachine.StateModel;
-import org.apache.helix.participant.statemachine.StateModelFactory;
 
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES;
 
-public class OnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel> {
+public class OnlineOfflineStateModelFactory extends 
StateTransitionHandlerFactory<TransitionHandler> {
   private static final Log LOG = 
LogFactory.getLog(OnlineOfflineStateModelFactory.class);
   private final String instanceName;
   private final AggregationTaskRunner taskRunner;
@@ -38,13 +39,13 @@ public class OnlineOfflineStateModelFactory extends 
StateModelFactory<StateModel
   }
 
   @Override
-  public StateModel createNewStateModel(String resourceName, String partition) 
{
-    LOG.info("Received request to process partition = " + partition + ", for " 
+
-      "resource = " + resourceName + ", at " + instanceName);
+  public TransitionHandler createStateTransitionHandler(PartitionId 
stateUnitKey) {
+    LOG.info("Received request to process partition = " + 
stateUnitKey.stringify()
+      + ", at " + instanceName);
     return new OnlineOfflineStateModel();
   }
 
-  public class OnlineOfflineStateModel extends StateModel {
+  public class OnlineOfflineStateModel extends TransitionHandler {
     public void onBecomeOnlineFromOffline(Message message, NotificationContext 
context) {
       String partitionName = message.getPartitionName();
       LOG.info("Received transition to Online from Offline for partition: " + 
partitionName);

http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
index 91ec305..1e4bac0 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
@@ -23,17 +23,16 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.api.id.StateModelDefId;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.util.HashMap;
 import java.util.Map;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.DEFAULT_STATE_MODEL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
@@ -76,9 +75,9 @@ public class MetricCollectorHAControllerTest extends 
AbstractMiniHBaseClusterTes
     HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
       instanceConfig2.getInstanceName(),
       InstanceType.PARTICIPANT, haController.zkConnectUrl);
-    
manager2.getStateMachineEngine().registerStateModelFactory(STATE_MODEL_NAME,
+    
manager2.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(DEFAULT_STATE_MODEL),
       new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(),
-        new AggregationTaskRunner(instanceConfig2.getInstanceName(), "")));
+        new AggregationTaskRunner(instanceConfig2.getInstanceName(), "", 
CLUSTER_NAME)));
     manager2.connect();
     haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
 

Reply via email to