AMBARI-12702: Express Upgrade: Parallelize restarts (jluniya)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b4468ce4 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b4468ce4 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b4468ce4 Branch: refs/heads/branch-dev-patch-upgrade Commit: b4468ce44b28bc72a64681e6dc7617b4a2812792 Parents: 049934f Author: Jayush Luniya <jlun...@hortonworks.com> Authored: Thu Oct 22 19:31:32 2015 -0700 Committer: Jayush Luniya <jlun...@hortonworks.com> Committed: Thu Oct 22 19:31:32 2015 -0700 ---------------------------------------------------------------------- .../ambari/server/state/UpgradeHelper.java | 38 +++++++++++--------- .../state/stack/upgrade/ClusterGrouping.java | 2 +- .../state/stack/upgrade/ColocatedGrouping.java | 2 +- .../server/state/stack/upgrade/Grouping.java | 19 +++++++--- .../stack/upgrade/ServiceCheckGrouping.java | 2 +- .../stack/upgrade/StageWrapperBuilder.java | 2 +- .../stack/upgrade/StageWrapperBuilderTest.java | 2 +- 7 files changed, 40 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b4468ce4/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java index fd92d21..ddfc36e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/UpgradeHelper.java @@ -286,10 +286,26 @@ public class UpgradeHelper { groupHolder.skippable = true; } + // Attempt to get the function of the group, during a NonRolling Upgrade + Task.Type functionName = null; + boolean scheduleInParallel = false; // NonRolling defaults to not performing service checks on a group. // Of course, a Service Check Group does indeed run them. if (upgradePack.getType() == UpgradeType.NON_ROLLING) { group.performServiceCheck = false; + + if (RestartGrouping.class.isInstance(group)) { + functionName = ((RestartGrouping) group).getFunction(); + scheduleInParallel = true; + } + if (StartGrouping.class.isInstance(group)) { + functionName = ((StartGrouping) group).getFunction(); + scheduleInParallel = true; + } + if (StopGrouping.class.isInstance(group)) { + functionName = ((StopGrouping) group).getFunction(); + scheduleInParallel = true; + } } StageWrapperBuilder builder = group.getBuilder(); @@ -311,19 +327,7 @@ public class UpgradeHelper { if (upgradePack.getType() == UpgradeType.ROLLING && !allTasks.containsKey(service.serviceName)) { continue; } - - // Attempt to get the function of the group, during a NonRolling Upgrade - Task.Type functionName = null; - if (RestartGrouping.class.isInstance(group)) { - functionName = ((RestartGrouping) group).getFunction(); - } - if (StartGrouping.class.isInstance(group)) { - functionName = ((StartGrouping) group).getFunction(); - } - if (StopGrouping.class.isInstance(group)) { - functionName = ((StopGrouping) group).getFunction(); - } for (String component : service.components) { // Rolling Upgrade has exactly one task for a Component. @@ -392,7 +396,7 @@ public class UpgradeHelper { hostsType.hosts = order; builder.add(context, hostsType, service.serviceName, - svc.isClientOnlyService(), pc, null); + svc.isClientOnlyService(), pc, null, false); } break; case NON_ROLLING: @@ -417,21 +421,21 @@ public class UpgradeHelper { builder.add(context, ht1, service.serviceName, - svc.isClientOnlyService(), pc, h1Params); + svc.isClientOnlyService(), pc, h1Params, false); builder.add(context, ht2, service.serviceName, - svc.isClientOnlyService(), pc, h2Params); + svc.isClientOnlyService(), pc, h2Params, false); } else { // If no NameNode HA, then don't need to change hostsType.hosts since there should be exactly one. builder.add(context, hostsType, service.serviceName, - svc.isClientOnlyService(), pc, null); + svc.isClientOnlyService(), pc, null, false); } break; } } else { builder.add(context, hostsType, service.serviceName, - svc.isClientOnlyService(), pc, null); + svc.isClientOnlyService(), pc, null, scheduleInParallel); } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/b4468ce4/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java index 0e9d2c8..6137285 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ClusterGrouping.java @@ -108,7 +108,7 @@ public class ClusterGrouping extends Grouping { @Override public void add(UpgradeContext ctx, HostsType hostsType, String service, - boolean clientOnly, ProcessingComponent pc, Map<String, String> params) { + boolean clientOnly, ProcessingComponent pc, Map<String, String> params, boolean scheduleInParallel) { // !!! no-op in this case } http://git-wip-us.apache.org/repos/asf/ambari/blob/b4468ce4/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java index 11e9267..8218162 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ColocatedGrouping.java @@ -81,7 +81,7 @@ public class ColocatedGrouping extends Grouping { @Override public void add(UpgradeContext ctx, HostsType hostsType, String service, - boolean clientOnly, ProcessingComponent pc, Map<String, String> params) { + boolean clientOnly, ProcessingComponent pc, Map<String, String> params, boolean scheduleInParallel) { boolean forUpgrade = ctx.getDirection().isUpgrade(); http://git-wip-us.apache.org/repos/asf/ambari/blob/b4468ce4/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java index cd3ee68..fd54ed8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/Grouping.java @@ -89,7 +89,7 @@ public class Grouping { */ @Override public void add(UpgradeContext ctx, HostsType hostsType, String service, - boolean clientOnly, ProcessingComponent pc, Map<String, String> params) { + boolean clientOnly, ProcessingComponent pc, Map<String, String> params, boolean scheduleInParallel) { boolean forUpgrade = ctx.getDirection().isUpgrade(); @@ -112,14 +112,23 @@ public class Grouping { // Add the processing component if (null != pc.tasks && 1 == pc.tasks.size()) { Task t = pc.tasks.get(0); - - for (String hostName : hostsType.hosts) { + if(scheduleInParallel) { + // Create single stage for all StageWrapper stage = new StageWrapper( t.getStageWrapperType(), - getStageText(t.getActionVerb(), ctx.getComponentDisplay(service, pc.name), Collections.singleton(hostName)), + getStageText(t.getActionVerb(), ctx.getComponentDisplay(service, pc.name), hostsType.hosts), params, - new TaskWrapper(service, pc.name, Collections.singleton(hostName), params, t)); + new TaskWrapper(service, pc.name, hostsType.hosts, params, t)); m_stages.add(stage); + } else { + for (String hostName : hostsType.hosts) { + StageWrapper stage = new StageWrapper( + t.getStageWrapperType(), + getStageText(t.getActionVerb(), ctx.getComponentDisplay(service, pc.name), Collections.singleton(hostName)), + params, + new TaskWrapper(service, pc.name, Collections.singleton(hostName), params, t)); + m_stages.add(stage); + } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/b4468ce4/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java index 0033185..19fabe8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/ServiceCheckGrouping.java @@ -103,7 +103,7 @@ public class ServiceCheckGrouping extends Grouping { */ @Override public void add(UpgradeContext ctx, HostsType hostsType, String service, - boolean clientOnly, ProcessingComponent pc, Map<String, String> params) { + boolean clientOnly, ProcessingComponent pc, Map<String, String> params, boolean scheduleInParallel) { // !!! nothing to do here } http://git-wip-us.apache.org/repos/asf/ambari/blob/b4468ce4/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java index 6ef0980..587ce55 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilder.java @@ -71,7 +71,7 @@ public abstract class StageWrapperBuilder { * additional parameters */ public abstract void add(UpgradeContext upgradeContext, HostsType hostsType, String service, - boolean clientOnly, ProcessingComponent pc, Map<String, String> params); + boolean clientOnly, ProcessingComponent pc, Map<String, String> params, boolean scheduleInParallel); /** * Builds the stage wrappers, including any pre- and post-procesing that needs http://git-wip-us.apache.org/repos/asf/ambari/blob/b4468ce4/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java index 94a5336..6fcf7ce 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/stack/upgrade/StageWrapperBuilderTest.java @@ -126,7 +126,7 @@ public class StageWrapperBuilderTest { */ @Override public void add(UpgradeContext upgradeContext, HostsType hostsType, String service, - boolean clientOnly, ProcessingComponent pc, Map<String, String> params) { + boolean clientOnly, ProcessingComponent pc, Map<String, String> params, boolean scheduleInParallel) { } /**