Updated Branches: refs/heads/helix-provisioning 9386a4cbc -> 37dd3cf4b
Add some basic checks to the provisioning stage Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/37dd3cf4 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/37dd3cf4 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/37dd3cf4 Branch: refs/heads/helix-provisioning Commit: 37dd3cf4bb37dd0d7a7fa55766e5115cecb184c8 Parents: 9386a4c Author: Kanak Biscuitwala <[email protected]> Authored: Mon Jan 13 16:19:18 2014 -0800 Committer: Kanak Biscuitwala <[email protected]> Committed: Mon Jan 13 16:19:18 2014 -0800 ---------------------------------------------------------------------- .../provisioner/ParticipantService.java | 6 +-- .../stages/ContainerProvisioningStage.java | 55 +++++++++++++++----- 2 files changed, 44 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/37dd3cf4/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java index 92b5a24..bfcce06 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java +++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java @@ -2,9 +2,9 @@ package org.apache.helix.controller.provisioner; public interface ParticipantService { - boolean init(ServiceConfig serviceConfig); - + // boolean init(ServiceConfig serviceConfig); + boolean start(); - + boolean stop(); } http://git-wip-us.apache.org/repos/asf/helix/blob/37dd3cf4/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java index 499f904..a17251d 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java @@ -123,14 +123,15 @@ public class ContainerProvisioningStage extends AbstractBaseStage { helixAdmin.addInstance(cluster.getId().toString(), instanceConfig); ListenableFuture<ContainerId> future = provisioner.allocateContainer(spec); - Futures.addCallback(future, new FutureCallback<ContainerId>() { + FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() { @Override public void onSuccess(ContainerId containerId) { InstanceConfig existingInstance = - helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString()); + helixAdmin + .getInstanceConfig(cluster.getId().toString(), participantId.toString()); existingInstance.setContainerId(containerId); existingInstance.setContainerState(ContainerState.ACQUIRED); - accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()), + accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance); } @@ -140,7 +141,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage { updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId, ContainerState.FAILED); } - }); + }; + safeAddCallback(future, callback); } // start new containers @@ -151,11 +153,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage { final ContainerId containerId = existingInstance.getContainerId(); existingInstance.setContainerId(containerId); existingInstance.setContainerState(ContainerState.CONNECTING); - accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()), + accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // create the helix participant and add it to cluster ListenableFuture<Boolean> future = provisioner.startContainer(containerId, participant); - Futures.addCallback(future, new FutureCallback<Boolean>() { + FutureCallback<Boolean> callback = new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), @@ -169,7 +171,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage { updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), ContainerState.FAILED); } - }); + }; + safeAddCallback(future, callback); } // release containers @@ -180,11 +183,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage { .toString()); final ContainerId containerId = existingInstance.getContainerId(); existingInstance.setContainerState(ContainerState.FINALIZING); - accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()), + accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // remove the participant ListenableFuture<Boolean> future = provisioner.deallocateContainer(containerId); - Futures.addCallback(future, new FutureCallback<Boolean>() { + FutureCallback<Boolean> callback = new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { InstanceConfig existingInstance = @@ -200,7 +203,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage { updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), ContainerState.FAILED); } - }); + }; + safeAddCallback(future, callback); } // stop but don't remove @@ -212,11 +216,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage { final ContainerId containerId = existingInstance.getContainerId(); existingInstance.setInstanceEnabled(false); existingInstance.setContainerState(ContainerState.TEARDOWN); - accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()), + accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()), existingInstance); // stop the container ListenableFuture<Boolean> future = provisioner.stopContainer(containerId); - Futures.addCallback(future, new FutureCallback<Boolean>() { + FutureCallback<Boolean> callback = new FutureCallback<Boolean>() { @Override public void onSuccess(Boolean result) { updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), @@ -231,18 +235,41 @@ public class ContainerProvisioningStage extends AbstractBaseStage { updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(), ContainerState.FAILED); } - }); + }; + safeAddCallback(future, callback); } } } } + /** + * Update a participant with a new container state + * @param helixAdmin + * @param accessor + * @param keyBuilder + * @param cluster + * @param participantId + * @param state + */ private void updateContainerState(HelixAdmin helixAdmin, HelixDataAccessor accessor, PropertyKey.Builder keyBuilder, Cluster cluster, ParticipantId participantId, ContainerState state) { InstanceConfig existingInstance = helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString()); existingInstance.setContainerState(state); - accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance); + accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance); + } + + /** + * Add a callback, failing if the add fails + * @param future the future to listen on + * @param callback the callback to invoke + */ + private <T> void safeAddCallback(ListenableFuture<T> future, FutureCallback<T> callback) { + try { + Futures.addCallback(future, callback); + } catch (Throwable t) { + callback.onFailure(t); + } } }
