AMBARI-17716. AMS HA not working in secure cluster. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/18cfea6f Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/18cfea6f Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/18cfea6f Branch: refs/heads/trunk Commit: 18cfea6f869fccd7d029640265baeae63b10a51a Parents: 885b7ae Author: Siddharth Wagle <swa...@hortonworks.com> Authored: Thu Jul 14 13:38:19 2016 -0700 Committer: Siddharth Wagle <swa...@hortonworks.com> Committed: Thu Jul 14 16:38:34 2016 -0700 ---------------------------------------------------------------------- .../ambari-metrics-timelineservice/pom.xml | 2 +- .../timeline/TimelineMetricConfiguration.java | 4 ++ .../availability/AggregationTaskRunner.java | 12 ++--- .../MetricCollectorHAController.java | 47 ++++++++++++-------- .../OnlineOfflineStateModelFactory.java | 15 ++++--- .../MetricCollectorHAControllerTest.java | 11 +++-- 6 files changed, 54 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-timelineservice/pom.xml index 7ae6155..9c9a303 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/pom.xml +++ b/ambari-metrics/ambari-metrics-timelineservice/pom.xml @@ -253,7 +253,7 @@ <dependency> <groupId>org.apache.helix</groupId> <artifactId>helix-core</artifactId> - <version>0.6.5</version> + <version>0.7.1</version> <exclusions> <exclusion> <artifactId>zookeeper</artifactId> http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index 6ac5f45..e82d65a 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -399,4 +399,8 @@ public class TimelineMetricConfiguration { return false; } } + + public boolean isSecurityEnabled() { + return hbaseConf.get("hbase.security.authentication", "").equals("kerberos"); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java index cad2a2b..ceab754 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; +import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.participant.StateMachineEngine; import java.util.HashMap; @@ -38,13 +39,13 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.DEFAULT_STATE_MODEL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME; public class AggregationTaskRunner { private final String instanceName; private final String zkAddress; + private final String clusterName; private HelixManager manager; private static final Log LOG = LogFactory.getLog(AggregationTaskRunner.class); private CheckpointManager checkpointManager; @@ -80,20 +81,21 @@ public class AggregationTaskRunner { PARTITION_AGGREGATION_TYPES.put(METRIC_AGGREGATORS + "_1", HOST); } - public AggregationTaskRunner(String instanceName, String zkAddress) { + public AggregationTaskRunner(String instanceName, String zkAddress, String clusterName) { this.instanceName = instanceName; this.zkAddress = zkAddress; + this.clusterName = clusterName; } public void initialize() throws Exception { - manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceName, + manager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddress); OnlineOfflineStateModelFactory stateModelFactory = new OnlineOfflineStateModelFactory(instanceName, this); StateMachineEngine stateMach = manager.getStateMachineEngine(); - stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory); + stateMach.registerStateModelFactory(StateModelDefId.from(DEFAULT_STATE_MODEL), stateModelFactory); manager.connect(); checkpointManager = new CheckpointManager(manager.getHelixPropertyStore()); http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java index 18b9059..84e4153 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java @@ -33,8 +33,8 @@ import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.OnlineOfflineSMD; - +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.tools.StateModelConfigGenerator;; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -42,21 +42,21 @@ import java.util.TreeSet; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO; public class MetricCollectorHAController { private static final Log LOG = LogFactory.getLog(MetricCollectorHAController.class); - static final String CLUSTER_NAME = "ambari-metrics-cluster"; + static final String CLUSTER_NAME = "ambari-metrics-cluster-unsecure"; static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS"; - static final String STATE_MODEL_NAME = OnlineOfflineSMD.name; + static final String DEFAULT_STATE_MODEL = "OnlineOffline"; static final String INSTANCE_NAME_DELIMITER = "_"; final String zkConnectUrl; final String instanceHostname; final InstanceConfig instanceConfig; final AggregationTaskRunner aggregationTaskRunner; + final TimelineMetricConfiguration configuration; // Cache list of known live instances final List<String> liveInstanceNames = new ArrayList<>(); @@ -69,6 +69,7 @@ public class MetricCollectorHAController { private volatile boolean isInitialized = false; public MetricCollectorHAController(TimelineMetricConfiguration configuration) { + this.configuration = configuration; String instancePort; try { instanceHostname = configuration.getInstanceHostnameFromEnv(); @@ -99,43 +100,53 @@ public class MetricCollectorHAController { instanceConfig.setHostName(instanceHostname); instanceConfig.setPort(instancePort); instanceConfig.setInstanceEnabled(true); - aggregationTaskRunner = new AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl); + aggregationTaskRunner = new AggregationTaskRunner( + instanceConfig.getInstanceName(), zkConnectUrl, getClusterName()); + } + + /** + * Name of Helix znode + */ + public String getClusterName() { + return CLUSTER_NAME; } /** * Initialize the instance with zookeeper via Helix */ public void initializeHAController() throws Exception { + String clusterName = getClusterName(); admin = new ZKHelixAdmin(zkConnectUrl); // create cluster - LOG.info("Creating zookeeper cluster node: " + CLUSTER_NAME); - admin.addCluster(CLUSTER_NAME, false); + LOG.info("Creating zookeeper cluster node: " + clusterName); + admin.addCluster(clusterName, false); // Adding host to the cluster - List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME); + List<String> nodes = admin.getInstancesInCluster(clusterName); if (nodes == null || !nodes.contains(instanceConfig.getInstanceName())) { LOG.info("Adding participant instance " + instanceConfig); - admin.addInstance(CLUSTER_NAME, instanceConfig); + admin.addInstance(clusterName, instanceConfig); } // Add a state model - if (admin.getStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME) == null) { + if (admin.getStateModelDef(clusterName, DEFAULT_STATE_MODEL) == null) { LOG.info("Adding ONLINE-OFFLINE state model to the cluster"); - admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, OnlineOfflineSMD.build()); + admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL, new StateModelDefinition( + StateModelConfigGenerator.generateConfigForOnlineOffline())); } // Add resources with 1 cluster-wide replica // Since our aggregators are unbalanced in terms of work distribution we // only need to distribute writes to METRIC_AGGREGATE and // METRIC_RECORD_MINUTE - List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME); + List<String> resources = admin.getResourcesInCluster(clusterName); if (!resources.contains(METRIC_AGGREGATORS)) { LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions and 1 replicas"); - admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, OnlineOfflineSMD.name, FULL_AUTO.toString()); + admin.addResource(clusterName, METRIC_AGGREGATORS, 2, DEFAULT_STATE_MODEL, FULL_AUTO.toString()); } // this will set up the ideal state, it calculates the preference list for // each partition similar to consistent hashing - admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1); + admin.rebalance(clusterName, METRIC_AGGREGATORS, 1); // Start participant startAggregators(); @@ -173,7 +184,7 @@ public class MetricCollectorHAController { private void startController() throws Exception { manager = HelixManagerFactory.getZKHelixManager( - CLUSTER_NAME, + getClusterName(), instanceHostname, InstanceType.CONTROLLER, zkConnectUrl @@ -245,7 +256,7 @@ public class MetricCollectorHAController { public void printClusterState() { StringBuilder sb = new StringBuilder("\n######################### Cluster HA state ########################"); - ExternalView resourceExternalView = admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS); + ExternalView resourceExternalView = admin.getResourceExternalView(getClusterName(), METRIC_AGGREGATORS); if (resourceExternalView != null) { getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb); } @@ -258,7 +269,7 @@ public class MetricCollectorHAController { StringBuilder sb) { TreeSet<String> sortedSet = new TreeSet<>(resourceExternalView.getPartitionSet()); sb.append("\nCLUSTER: "); - sb.append(CLUSTER_NAME); + sb.append(getClusterName()); sb.append("\nRESOURCE: "); sb.append(resourceName); for (String partitionName : sortedSet) { http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java index 7d3350b..eb63327 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/OnlineOfflineStateModelFactory.java @@ -21,13 +21,14 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator.AGGREGATOR_TYPE; import org.apache.helix.NotificationContext; +import org.apache.helix.api.StateTransitionHandlerFactory; +import org.apache.helix.api.TransitionHandler; +import org.apache.helix.api.id.PartitionId; import org.apache.helix.model.Message; -import org.apache.helix.participant.statemachine.StateModel; -import org.apache.helix.participant.statemachine.StateModelFactory; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.PARTITION_AGGREGATION_TYPES; -public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { +public class OnlineOfflineStateModelFactory extends StateTransitionHandlerFactory<TransitionHandler> { private static final Log LOG = LogFactory.getLog(OnlineOfflineStateModelFactory.class); private final String instanceName; private final AggregationTaskRunner taskRunner; @@ -38,13 +39,13 @@ public class OnlineOfflineStateModelFactory extends StateModelFactory<StateModel } @Override - public StateModel createNewStateModel(String resourceName, String partition) { - LOG.info("Received request to process partition = " + partition + ", for " + - "resource = " + resourceName + ", at " + instanceName); + public TransitionHandler createStateTransitionHandler(PartitionId stateUnitKey) { + LOG.info("Received request to process partition = " + stateUnitKey.stringify() + + ", at " + instanceName); return new OnlineOfflineStateModel(); } - public class OnlineOfflineStateModel extends StateModel { + public class OnlineOfflineStateModel extends TransitionHandler { public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { String partitionName = message.getPartitionName(); LOG.info("Received transition to Online from Offline for partition: " + partitionName); http://git-wip-us.apache.org/repos/asf/ambari/blob/18cfea6f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java index 91ec305..1e4bac0 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java @@ -23,17 +23,16 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; +import org.apache.helix.api.id.StateModelDefId; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.junit.Before; import org.junit.Test; - import java.util.HashMap; import java.util.Map; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.DEFAULT_STATE_MODEL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME; import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.replay; @@ -76,9 +75,9 @@ public class MetricCollectorHAControllerTest extends AbstractMiniHBaseClusterTes HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, instanceConfig2.getInstanceName(), InstanceType.PARTICIPANT, haController.zkConnectUrl); - manager2.getStateMachineEngine().registerStateModelFactory(STATE_MODEL_NAME, + manager2.getStateMachineEngine().registerStateModelFactory(StateModelDefId.from(DEFAULT_STATE_MODEL), new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(), - new AggregationTaskRunner(instanceConfig2.getInstanceName(), ""))); + new AggregationTaskRunner(instanceConfig2.getInstanceName(), "", CLUSTER_NAME))); manager2.connect(); haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);