Repository: incubator-brooklyn Updated Branches: refs/heads/master 8457515f6 -> c6273b891
Refactoring openIptables execution Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/3cc2e5b4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/3cc2e5b4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/3cc2e5b4 Branch: refs/heads/master Commit: 3cc2e5b4e885c6c998b19747d764f767f9065307 Parents: 3965305 Author: Ivana Yovcheva <[email protected]> Authored: Wed Oct 21 13:38:39 2015 +0300 Committer: Ivana Yovcheva <[email protected]> Committed: Tue Oct 27 14:10:26 2015 +0200 ---------------------------------------------------------------------- .../entity/machine/MachineInitTasks.java | 164 ++++++++++++------- 1 file changed, 106 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/3cc2e5b4/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java ---------------------------------------------------------------------- diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java index be791b3..ff540f2 100644 --- a/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java +++ b/software/base/src/main/java/org/apache/brooklyn/entity/machine/MachineInitTasks.java @@ -18,15 +18,12 @@ */ package org.apache.brooklyn.entity.machine; -import java.io.ByteArrayOutputStream; import java.util.List; -import java.util.concurrent.Callable; -import com.google.common.collect.ImmutableMap; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; -import org.apache.brooklyn.util.stream.Streams; +import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,29 +58,35 @@ public class MachineInitTasks { } /** - * Returns a queued {@link Task} which opens the given ports in iptables on the given machine. - */ - public Task<Void> openIptablesAsync(final Iterable<Integer> inboundPorts, final SshMachineLocation machine) { - return DynamicTasks.queue("open iptables "+toTruncatedString(inboundPorts, 6), new Callable<Void>() { - public Void call() { - openIptablesImpl(inboundPorts, machine); - return null; - } - }); - } - - /** * Returns a queued {@link Task} which stops iptables on the given machine. */ public Task<Void> stopIptablesAsync(final SshMachineLocation machine) { - return DynamicTasks.queue("stop iptables", new Callable<Void>() { - public Void call() { + return DynamicTasks.queue("stop iptables", new Runnable() { + public void run() { stopIptablesImpl(machine); - return null; } }); } + protected void stopIptablesImpl(final SshMachineLocation machine) { + + log.info("Stopping iptables for {} at {}", entity(), machine); + + List<String> cmds = ImmutableList.<String>of(); + + Task<Integer> checkFirewall = checkLocationFirewall(machine); + + if (checkFirewall.getUnchecked() == 0) { + cmds = ImmutableList.of(IptablesCommands.firewalldServiceStop(), IptablesCommands.firewalldServiceStatus()); + } else { + cmds = ImmutableList.of(IptablesCommands.iptablesServiceStop(), IptablesCommands.iptablesServiceStatus()); + } + + + subTaskHelperAllowingNonZeroExitCode("execute stop iptables", machine, cmds.toArray(new String[cmds.size()])); + } + + /** * See docs in {@link BashCommands#dontRequireTtyForSudo()} */ @@ -91,6 +94,17 @@ public class MachineInitTasks { return DynamicTasks.queue(SshTasks.dontRequireTtyForSudo(machine, true).newTask().asTask()); } + /** + * Returns a queued {@link Task} which opens the given ports in iptables on the given machine. + */ + public Task<Void> openIptablesAsync(final Iterable<Integer> inboundPorts, final SshMachineLocation machine) { + return DynamicTasks.queue("open iptables "+toTruncatedString(inboundPorts, 6), new Runnable() { + public void run() { + openIptablesImpl(inboundPorts, machine); + } + }); + } + protected void openIptablesImpl(Iterable<Integer> inboundPorts, SshMachineLocation machine) { if (inboundPorts == null || Iterables.isEmpty(inboundPorts)) { log.info("No ports to open in iptables (no inbound ports) for {} at {}", machine, this); @@ -98,47 +112,91 @@ public class MachineInitTasks { log.info("Opening ports in iptables for {} at {}", entity(), machine); List<String> iptablesRules = Lists.newArrayList(); + String iptablesInstallCommands = null; - if (isLocationFirewalldEnabled(machine)) { + Task<Integer> checkFirewall = checkLocationFirewall(machine); + + if (checkFirewall.getUnchecked() == 0) { for (Integer port : inboundPorts) { iptablesRules.add(IptablesCommands.addFirewalldRule(Chain.INPUT, Protocol.TCP, port, Policy.ACCEPT)); } } else { iptablesRules = createIptablesRulesForNetworkInterface(inboundPorts); - iptablesRules.add(IptablesCommands.saveIptablesRules()); - } - List<String> batch = Lists.newArrayList(); - - ByteArrayOutputStream outStream = new ByteArrayOutputStream(); - ByteArrayOutputStream errStream = new ByteArrayOutputStream(); - Tasks.addTagDynamically(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDOUT, outStream)); - Tasks.addTagDynamically(BrooklynTaskTags.tagForStreamSoft(BrooklynTaskTags.STREAM_STDERR, errStream)); - // Some entities, such as Riak (erlang based) have a huge range of ports, which leads to a script that - // is too large to run (fails with a broken pipe). Batch the rules into batches of 50 - for (String rule : iptablesRules) { - batch.add(rule); - if (batch.size() == 50) { - machine.execCommands(ImmutableMap.of("out", outStream, "err", errStream), "Inserting iptables rules, 50 command batch", batch); - batch.clear(); - } + iptablesInstallCommands = IptablesCommands.saveIptablesRules(); } - if (batch.size() > 0) { - machine.execCommands(ImmutableMap.of("out", outStream, "err", errStream), "Inserting iptables rules", batch); - } - machine.execCommands(ImmutableMap.of("out", outStream, "err", errStream), "List iptables rules", ImmutableList.of(IptablesCommands.listIptablesRule())); + + insertIptablesRules(iptablesRules, iptablesInstallCommands, machine); + listIptablesRules(machine); } } - protected void stopIptablesImpl(SshMachineLocation machine) { - log.info("Stopping iptables for {} at {}", entity(), machine); + /** + * Returns a queued {@link Task} which checks if location firewall is enabled. + */ + public Task<Integer> checkLocationFirewall(final SshMachineLocation machine) { + return subTaskHelperAllowingNonZeroExitCode("check if firewall is active", machine, IptablesCommands.firewalldServiceIsActive()); + } - List<String> cmds = ImmutableList.<String>of(); - if (isLocationFirewalldEnabled(machine)) { - cmds = ImmutableList.of(IptablesCommands.firewalldServiceStop(), IptablesCommands.firewalldServiceStatus()); - } else { - cmds = ImmutableList.of(IptablesCommands.iptablesServiceStop(), IptablesCommands.iptablesServiceStatus()); + /** + * Returns a queued {@link Task} which inserts iptables rules. + */ + private Task<Void> insertIptablesRules(final List<String> iptablesRules, final String installCommands, final SshMachineLocation machine) { + return DynamicTasks.queue("insert rules", new Runnable() { + public void run() { + insertIptablesRulesImpl(iptablesRules, installCommands, machine); + } + }); + } + + private void insertIptablesRulesImpl(List<String> iptablesRules, String installCommands, SshMachineLocation machine) { + + // Some entities, such as Riak (erlang based) have a huge range of ports, which leads to a script that + // is too large to run (fails with a broken pipe). Batch the rules into batches of 100 + List<List<String> > batches = Lists.partition(iptablesRules, 100); + + int batchNumber = 0; + for (List<String> batch : batches) { + batchNumber++; + insertIptablesRulesOnCommandBatches(batch, machine, batchNumber); } - machine.execCommands("Stopping iptables", cmds); + if (installCommands != null) { + serviceIptablesSave(installCommands, machine); + } + } + + /** + * Returns a queued {@link Task} which inserts iptables rules on command batches. + */ + private Task<Integer> insertIptablesRulesOnCommandBatches(final List<String> commandsBatch, final SshMachineLocation machine, int batchNumber) { + return subTaskHelperRequiringZeroExitCode("commands batch " + batchNumber, machine, commandsBatch.toArray(new String[commandsBatch.size()])); + } + + /** + * Returns a queued {@link Task} which runs iptables save commands. + */ + private Task<Integer> serviceIptablesSave(final String installCommands, final SshMachineLocation machine) { + return subTaskHelperRequiringZeroExitCode("save", machine, installCommands); + } + + /** + * Returns a queued {@link Task} which lists the iptables rules. + */ + private Task<Integer> listIptablesRules(final SshMachineLocation machine) { + return subTaskHelperRequiringZeroExitCode("list rules", machine, IptablesCommands.listIptablesRule()); + } + + private Task<Integer> subTaskHelperRequiringZeroExitCode(String taskName, SshMachineLocation machine, String... comands) { + ProcessTaskFactory<Integer> taskFactory = SshTasks.newSshExecTaskFactory(machine, comands) + .summary(taskName) + .requiringExitCodeZero(); + return DynamicTasks.queue(taskFactory).asTask(); + } + + private Task<Integer> subTaskHelperAllowingNonZeroExitCode(String taskName, SshMachineLocation machine, String... comands) { + ProcessTaskFactory<Integer> taskFactory = SshTasks.newSshExecTaskFactory(machine, comands) + .summary(taskName) + .allowingNonZeroExitCode(); + return DynamicTasks.queue(taskFactory).asTask(); } private List<String> createIptablesRulesForNetworkInterface(Iterable<Integer> ports) { @@ -148,16 +206,6 @@ public class MachineInitTasks { } return iptablesRules; } - - public boolean isLocationFirewalldEnabled(SshMachineLocation location) { - int result = location.execCommands("checking if firewalld is active", - ImmutableList.of(IptablesCommands.firewalldServiceIsActive())); - if (result == 0) { - return true; - } - - return false; - } protected String toTruncatedString(Iterable<?> vals, int maxShown) { StringBuilder result = new StringBuilder("[");
