http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java ---------------------------------------------------------------------- diff --git a/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java new file mode 100644 index 0000000..08fba87 --- /dev/null +++ b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.monitoring.zabbix; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.net.URI; +import java.net.URL; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.http.client.HttpClient; +import org.apache.http.impl.NoConnectionReuseStrategy; +import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.EntityFunctions; +import brooklyn.entity.basic.EntityLocal; +import brooklyn.event.feed.AbstractFeed; +import brooklyn.event.feed.AttributePollHandler; +import brooklyn.event.feed.PollHandler; +import brooklyn.event.feed.Poller; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.location.Location; +import brooklyn.location.MachineLocation; +import brooklyn.location.access.BrooklynAccessUtils; +import brooklyn.location.basic.SupportsPortForwarding; +import brooklyn.util.http.HttpTool; +import brooklyn.util.http.HttpToolResponse; +import brooklyn.util.net.Cidr; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.base.Objects; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicates; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.net.HostAndPort; +import com.google.common.reflect.TypeToken; +import com.google.gson.JsonObject; + +public class ZabbixFeed extends AbstractFeed { + + public static final Logger log = LoggerFactory.getLogger(ZabbixFeed.class); + + public static final String JSON_ITEM_GET = + "{ \"jsonrpc\":\"2.0\",\"method\":\"item.get\"," + + "\"params\":{\"output\":\"extend\"," + + "\"filter\":{\"hostid\":[\"{{hostId}}\"],\"key_\":\"{{itemKey}}\"}}," + + "\"auth\":\"{{token}}\",\"id\":{{id}}}"; + public static final String JSON_USER_LOGIN = + "{ \"jsonrpc\":\"2.0\",\"method\":\"user.login\"," + + "\"params\":{\"user\":\"{{username}}\",\"password\":\"{{password}}\"}," + + "\"id\":0 }"; + public static final String JSON_HOST_CREATE = + "{ \"jsonrpc\":\"2.0\",\"method\":\"host.create\"," + + "\"params\":{\"host\":\"{{host}}\"," + + "\"interfaces\":[{\"type\":1,\"main\":1,\"useip\":1,\"ip\":\"{{ip}}\",\"dns\":\"\",\"port\":\"{{port}}\"}]," + + "\"groups\":[{\"groupid\":\"{{groupId}}\"}]," + + "\"templates\":[{\"templateid\":\"{{templateId}}\"}]}," + + "\"auth\":\"{{token}}\",\"id\":{{id}}}"; + + private static final AtomicInteger id = new AtomicInteger(0); + + @SuppressWarnings("serial") + public static final ConfigKey<Set<ZabbixPollConfig<?>>> POLLS = ConfigKeys.newConfigKey( + new TypeToken<Set<ZabbixPollConfig<?>>>() {}, + "polls"); + + @SuppressWarnings("serial") + public static final ConfigKey<Supplier<URI>> BASE_URI_PROVIDER = ConfigKeys.newConfigKey( + new TypeToken<Supplier<URI>>() {}, + "baseUriProvider"); + + public static final ConfigKey<Integer> GROUP_ID = ConfigKeys.newIntegerConfigKey("groupId"); + + public static final ConfigKey<Integer> TEMPLATE_ID = ConfigKeys.newIntegerConfigKey("templateId"); + + @SuppressWarnings("serial") + public static final ConfigKey<Function<? super EntityLocal, String>> UNIQUE_HOSTNAME_GENERATOR = ConfigKeys.newConfigKey( + new TypeToken<Function<? super EntityLocal, String>>() {}, + "uniqueHostnameGenerator"); + + public static Builder<ZabbixFeed, ?> builder() { + return new ConcreteBuilder(); + } + + private static class ConcreteBuilder extends Builder<ZabbixFeed, ConcreteBuilder> { + } + + public static class Builder<T extends ZabbixFeed, B extends Builder<T,B>> { + private EntityLocal entity; + private Supplier<URI> baseUriProvider; + private long period = 500; + private TimeUnit periodUnits = TimeUnit.MILLISECONDS; + private List<ZabbixPollConfig<?>> polls = Lists.newArrayList(); + private URI baseUri; + private boolean suspended = false; + private volatile boolean built; + private ZabbixServer server; + private String username; + private String password; + private Integer sessionTimeout; + private Integer groupId; + private Integer templateId; + private Function<? super EntityLocal, String> uniqueHostnameGenerator = Functions.compose( + EntityFunctions.id(), + EntityFunctions.locationMatching(Predicates.instanceOf(MachineLocation.class))); + private String uniqueTag; + + @SuppressWarnings("unchecked") + protected B self() { + return (B) this; + } + + public B entity(EntityLocal val) { + this.entity = val; + return self(); + } + public B baseUri(Supplier<URI> val) { + if (baseUri!=null && val!=null) + throw new IllegalStateException("Builder cannot take both a URI and a URI Provider"); + this.baseUriProvider = val; + return self(); + } + public B baseUri(URI val) { + if (baseUriProvider!=null && val!=null) + throw new IllegalStateException("Builder cannot take both a URI and a URI Provider"); + this.baseUri = val; + return self(); + } + public B baseUrl(URL val) { + return baseUri(URI.create(val.toString())); + } + public B baseUri(String val) { + return baseUri(URI.create(val)); + } + public B period(long millis) { + return period(millis, TimeUnit.MILLISECONDS); + } + public B period(long val, TimeUnit units) { + this.period = val; + this.periodUnits = units; + return self(); + } + public B poll(ZabbixPollConfig<?> config) { + polls.add(config); + return self(); + } + public B suspended() { + return suspended(true); + } + public B suspended(boolean startsSuspended) { + this.suspended = startsSuspended; + return self(); + } + + public B server(final ZabbixServer server) { + this.server = server; + baseUri(URI.create(server.getConfig(ZabbixServer.ZABBIX_SERVER_API_URL))); + username(server.getConfig(ZabbixServer.ZABBIX_SERVER_USERNAME)); + password(server.getConfig(ZabbixServer.ZABBIX_SERVER_PASSWORD)); + sessionTimeout(server.getConfig(ZabbixServer.ZABBIX_SESSION_TIMEOUT)); + return self(); + } + public B username(String username) { + this.username = username; + return self(); + } + public B password(String password) { + this.password = password; + return self(); + } + public B sessionTimeout(Integer sessionTimeout) { + this.sessionTimeout = sessionTimeout; + return self(); + } + public B groupId(Integer groupId) { + this.groupId = groupId; + return self(); + } + public B templateId(Integer templateId) { + this.templateId = templateId; + return self(); + } + public B register(Integer groupId, Integer templateId) { + this.groupId = groupId; + this.templateId = templateId; + return self(); + } + /** + * For generating the name to be used when registering the zabbix agent with the zabbix server. + * When called, guarantees that the entity will have a {@link MachineLocation} (see {@link Entity#getLocations()}). + * Must return a non-empty string that will be unique across all machines where zabbix agents are installed. + */ + public B uniqueHostnameGenerator(Function<? super EntityLocal, String> val) { + this.uniqueHostnameGenerator = checkNotNull(val, "uniqueHostnameGenerator"); + return self(); + } + + public Builder uniqueTag(String uniqueTag) { + this.uniqueTag = uniqueTag; + return this; + } + + @SuppressWarnings("unchecked") + public T build() { + // If server not set and other config not available, try to obtain from entity config + if (server == null + && (baseUri == null || baseUriProvider == null) + && username == null && password == null && sessionTimeout == null) { + ZabbixServer server = Preconditions.checkNotNull(entity.getConfig(ZabbixMonitored.ZABBIX_SERVER), "The ZABBIX_SERVER config key must be set on the entity"); + server(server); + } + // Now create feed + T result = (T) new ZabbixFeed(this); + result.setEntity(checkNotNull(entity, "entity")); + built = true; + if (suspended) result.suspend(); + result.start(); + return result; + } + @Override + protected void finalize() { + if (!built) log.warn("ZabbixFeed.Builder created, but build() never called"); + } + } + + protected static class ZabbixPollIdentifier { + final String itemName; + + protected ZabbixPollIdentifier(String itemName) { + this.itemName = checkNotNull(itemName, "itemName"); + } + + @Override + public int hashCode() { + return Objects.hashCode(itemName); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof ZabbixPollIdentifier)) { + return false; + } + ZabbixPollIdentifier o = (ZabbixPollIdentifier) other; + return Objects.equal(itemName, o.itemName); + } + } + + // Flag set when the Zabbix agent is registered for a host + protected final AtomicBoolean registered = new AtomicBoolean(false); + + /** + * For rebind; do not call directly; use builder + */ + public ZabbixFeed() { + } + + protected ZabbixFeed(final Builder<? extends ZabbixFeed, ?> builder) { + setConfig(BASE_URI_PROVIDER, builder.baseUriProvider); + if (builder.baseUri != null) { + if (builder.baseUriProvider != null) { + throw new IllegalStateException("Not permitted to supply baseUri and baseUriProvider"); + } + setConfig(BASE_URI_PROVIDER, Suppliers.ofInstance(builder.baseUri)); + } else { + setConfig(BASE_URI_PROVIDER, checkNotNull(builder.baseUriProvider, "baseUriProvider and baseUri")); + } + + setConfig(GROUP_ID, checkNotNull(builder.groupId, "Zabbix groupId must be set")); + setConfig(TEMPLATE_ID, checkNotNull(builder.templateId, "Zabbix templateId must be set")); + setConfig(UNIQUE_HOSTNAME_GENERATOR, checkNotNull(builder.uniqueHostnameGenerator, "uniqueHostnameGenerator")); + + Set<ZabbixPollConfig<?>> polls = Sets.newLinkedHashSet(); + for (ZabbixPollConfig<?> config : builder.polls) { + @SuppressWarnings({ "unchecked", "rawtypes" }) + ZabbixPollConfig<?> configCopy = new ZabbixPollConfig(config); + if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); + polls.add(configCopy); + } + setConfig(POLLS, polls); + initUniqueTag(builder.uniqueTag, polls); + } + + @Override + protected void preStart() { + final Supplier<URI> baseUriProvider = getConfig(BASE_URI_PROVIDER); + final Function<? super EntityLocal, String> uniqueHostnameGenerator = getConfig(UNIQUE_HOSTNAME_GENERATOR); + final Integer groupId = getConfig(GROUP_ID); + final Integer templateId = getConfig(TEMPLATE_ID); + final Set<ZabbixPollConfig<?>> polls = getConfig(POLLS); + + log.info("starting zabbix feed for {}", entity); + + // TODO if supplier returns null, we may wish to defer initialization until url available? + // TODO for https should we really trust all? + final HttpClient httpClient = HttpTool.httpClientBuilder() + .trustAll() + .clientConnectionManager(new ThreadSafeClientConnManager()) + .reuseStrategy(new NoConnectionReuseStrategy()) + .uri(baseUriProvider.get()) + .build(); + + // Registration job, calls Zabbix host.create API + final Callable<HttpToolResponse> registerJob = new Callable<HttpToolResponse>() { + @Override + public HttpToolResponse call() throws Exception { + if (!registered.get()) { + // Find the first machine, if available + Optional<Location> location = Iterables.tryFind(entity.getLocations(), Predicates.instanceOf(MachineLocation.class)); + if (!location.isPresent()) { + return null; // Do nothing until location is present + } + MachineLocation machine = (MachineLocation) location.get(); + + String host = uniqueHostnameGenerator.apply(entity); + + // Select address and port using port-forwarding if available + String address = entity.getAttribute(Attributes.ADDRESS); + Integer port = entity.getAttribute(ZabbixMonitored.ZABBIX_AGENT_PORT); + if (machine instanceof SupportsPortForwarding) { + Cidr management = entity.getConfig(BrooklynAccessUtils.MANAGEMENT_ACCESS_CIDR); + HostAndPort forwarded = ((SupportsPortForwarding) machine).getSocketEndpointFor(management, port); + address = forwarded.getHostText(); + port = forwarded.getPort(); + } + + // Fill in the JSON template and POST it + byte[] body = JSON_HOST_CREATE + .replace("{{token}}", entity.getConfig(ZabbixMonitored.ZABBIX_SERVER).getAttribute(ZabbixServer.ZABBIX_TOKEN)) + .replace("{{host}}", host) + .replace("{{ip}}", address) + .replace("{{port}}", Integer.toString(port)) + .replace("{{groupId}}", Integer.toString(groupId)) + .replace("{{templateId}}", Integer.toString(templateId)) + .replace("{{id}}", Integer.toString(id.incrementAndGet())) + .getBytes(); + + return HttpTool.httpPost(httpClient, baseUriProvider.get(), ImmutableMap.of("Content-Type", "application/json"), body); + } + return null; + } + }; + + // The handler for the registration job + PollHandler<? super HttpToolResponse> registrationHandler = new PollHandler<HttpToolResponse>() { + @Override + public void onSuccess(HttpToolResponse val) { + if (registered.get() || val == null) { + return; // Skip if we are registered already or no data from job + } + JsonObject response = HttpValueFunctions.jsonContents().apply(val).getAsJsonObject(); + if (response.has("error")) { + // Parse the JSON error object and log the message + JsonObject error = response.get("error").getAsJsonObject(); + String message = error.get("message").getAsString(); + String data = error.get("data").getAsString(); + log.warn("zabbix failed registering host - {}: {}", message, data); + } else if (response.has("result")) { + // Parse the JSON result object and save the hostId + JsonObject result = response.get("result").getAsJsonObject(); + String hostId = result.get("hostids").getAsJsonArray().get(0).getAsString(); + // Update the registered status if not set + if (registered.compareAndSet(false, true)) { + entity.setAttribute(ZabbixMonitored.ZABBIX_AGENT_HOSTID, hostId); + log.info("zabbix registered host as id {}", hostId); + } + } else { + throw new IllegalStateException(String.format("zabbix host registration returned invalid result: %s", response.toString())); + } + } + @Override + public boolean checkSuccess(HttpToolResponse val) { + return (val.getResponseCode() == 200); + } + @Override + public void onFailure(HttpToolResponse val) { + log.warn("zabbix sever returned failure code: {}", val.getResponseCode()); + } + @Override + public void onException(Exception exception) { + log.warn("zabbix exception registering host", exception); + } + @Override + public String toString() { + return super.toString()+"["+getDescription()+"]"; + } + @Override + public String getDescription() { + return "Zabbix rest poll"; + } + }; + + // Schedule registration attempt once per second + getPoller().scheduleAtFixedRate(registerJob, registrationHandler, 1000l); // TODO make configurable + + // Create a polling job for each Zabbix metric + for (final ZabbixPollConfig<?> config : polls) { + Callable<HttpToolResponse> pollJob = new Callable<HttpToolResponse>() { + @Override + public HttpToolResponse call() throws Exception { + if (registered.get()) { + if (log.isTraceEnabled()) log.trace("zabbix polling {} for {}", entity, config); + byte[] body = JSON_ITEM_GET + .replace("{{token}}", entity.getConfig(ZabbixMonitored.ZABBIX_SERVER).getAttribute(ZabbixServer.ZABBIX_TOKEN)) + .replace("{{hostId}}", entity.getAttribute(ZabbixMonitored.ZABBIX_AGENT_HOSTID)) + .replace("{{itemKey}}", config.getItemKey()) + .replace("{{id}}", Integer.toString(id.incrementAndGet())) + .getBytes(); + + return HttpTool.httpPost(httpClient, baseUriProvider.get(), ImmutableMap.of("Content-Type", "application/json"), body); + } else { + throw new IllegalStateException("zabbix agent not yet registered"); + } + } + }; + + // Schedule the Zabbix polling job + AttributePollHandler<? super HttpToolResponse> pollHandler = new AttributePollHandler<HttpToolResponse>(config, entity, this); + long minPeriod = Integer.MAX_VALUE; // TODO make configurable + if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); + getPoller().scheduleAtFixedRate(pollJob, pollHandler, minPeriod); + } + + } + + @SuppressWarnings("unchecked") + protected Poller<HttpToolResponse> getPoller() { + return (Poller<HttpToolResponse>) super.getPoller(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixMonitored.java ---------------------------------------------------------------------- diff --git a/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixMonitored.java b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixMonitored.java new file mode 100644 index 0000000..78e9dfd --- /dev/null +++ b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixMonitored.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.monitoring.zabbix; + +import brooklyn.config.ConfigKey; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.util.flags.SetFromFlag; + +public interface ZabbixMonitored { + + /** The entity representing the Zabbix server monitoring an entity. */ + @SetFromFlag("zabbixServer") + ConfigKey<ZabbixServer> ZABBIX_SERVER = new BasicConfigKey<ZabbixServer>(ZabbixServer.class, "zabbix.server.entity", "Zabbix server for this entity"); + + PortAttributeSensorAndConfigKey ZABBIX_AGENT_PORT = new PortAttributeSensorAndConfigKey("zabbix.agent.port", "The port the Zabbix agent is listening on", "10050+"); + + AttributeSensor<String> ZABBIX_AGENT_HOSTID = new BasicAttributeSensor<String>(String.class, "zabbix.agent.hostid", "The hostId for a Zabbix monitored agent"); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixPollConfig.java ---------------------------------------------------------------------- diff --git a/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixPollConfig.java b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixPollConfig.java new file mode 100644 index 0000000..a215133 --- /dev/null +++ b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixPollConfig.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.monitoring.zabbix; + +import javax.annotation.Nullable; + +import brooklyn.event.AttributeSensor; +import brooklyn.event.feed.PollConfig; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.event.feed.http.JsonFunctions; +import brooklyn.util.collections.MutableList; +import brooklyn.util.http.HttpToolResponse; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.gson.JsonElement; + +public class ZabbixPollConfig<T> extends PollConfig<HttpToolResponse, T, ZabbixPollConfig<T>> { + + private String itemKey; + + public ZabbixPollConfig(AttributeSensor<T> sensor) { + super(sensor); + // Add onSuccess method to extract the last value of the item + // FIXME Fix generics + onSuccess((Function)HttpValueFunctions.chain( + HttpValueFunctions.jsonContents(), + new Function<JsonElement, JsonElement>() { + @Override + public JsonElement apply(@Nullable JsonElement input) { + Preconditions.checkNotNull(input, "JSON input"); + return input.getAsJsonObject().get("result") + .getAsJsonArray().get(0) + .getAsJsonObject().get("lastvalue"); + } + }, + JsonFunctions.cast(getSensor().getType()))); + } + + public ZabbixPollConfig(ZabbixPollConfig<T> other) { + super(other); + this.itemKey = other.getItemKey(); + } + + public String getItemKey() { + return itemKey; + } + + public ZabbixPollConfig<T> itemKey(String val) { + this.itemKey = val; + return this; + } + + @Override + protected MutableList<Object> equalsFields() { + return super.equalsFields().appendIfNotNull(itemKey); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixServer.java ---------------------------------------------------------------------- diff --git a/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixServer.java b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixServer.java new file mode 100644 index 0000000..db287e8 --- /dev/null +++ b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixServer.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.monitoring.zabbix; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.Entity; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicConfigKey; +import brooklyn.util.flags.SetFromFlag; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; + +@ImplementedBy(ZabbixServerImpl.class) +public interface ZabbixServer extends Entity { + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @SetFromFlag("filter") + ConfigKey<Predicate<? super Entity>> ENTITY_FILTER = new BasicConfigKey(Predicate.class, "zabbix.server.filter", "Filter for entities which will automatically be monitored", Predicates.instanceOf(ZabbixMonitored.class)); + + @SetFromFlag("serverApiUrl") + ConfigKey<String> ZABBIX_SERVER_API_URL = new BasicConfigKey<String>(String.class, "zabbix.server.apiUrl", "Main Zabbix server API URL"); + + @SetFromFlag("username") + ConfigKey<String> ZABBIX_SERVER_USERNAME = new BasicConfigKey<String>(String.class, "zabbix.server.username", "Zabbix server API login user"); + + @SetFromFlag("password") + ConfigKey<String> ZABBIX_SERVER_PASSWORD = new BasicConfigKey<String>(String.class, "zabbix.server.password", "Zabbix server API login password"); + + ConfigKey<Integer> ZABBIX_SESSION_TIMEOUT = new BasicConfigKey<Integer>(Integer.class, "zabbix.server.sessionTimeout", "Zabbix server API session timeout period (seconds)", 3600); + + AttributeSensor<String> ZABBIX_TOKEN = new BasicAttributeSensor<String>(String.class, "zabbix.server.token", "Zabbix server API authentication token"); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixServerImpl.java ---------------------------------------------------------------------- diff --git a/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixServerImpl.java b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixServerImpl.java new file mode 100644 index 0000000..9ad8ac0 --- /dev/null +++ b/sandbox/monitoring/src/main/java/org/apache/brooklyn/entity/monitoring/zabbix/ZabbixServerImpl.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.monitoring.zabbix; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.Entity; +import brooklyn.entity.basic.AbstractEntity; +import brooklyn.entity.basic.DynamicGroup; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.entity.trait.Startable; +import brooklyn.event.feed.http.HttpFeed; +import brooklyn.event.feed.http.HttpPollConfig; +import brooklyn.event.feed.http.HttpValueFunctions; +import brooklyn.location.Location; +import brooklyn.location.basic.SshMachineLocation; +import brooklyn.policy.PolicySpec; + +import com.google.common.base.Functions; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; + +public class ZabbixServerImpl extends AbstractEntity implements ZabbixServer { + + private static final Logger log = LoggerFactory.getLogger(ZabbixServerImpl.class); + + private Object[] mutex = new Object[0]; + private DynamicGroup monitoredEntities; + private AgentTrackingPolicy policy; + private Multimap<Location, Entity> entityLocations = HashMultimap.create(); + + private transient HttpFeed login; + + @Override + public void init() { + super.init(); + Predicate<? super Entity> filter = getConfig(ENTITY_FILTER); + monitoredEntities = addChild(EntitySpec.create(DynamicGroup.class) + .configure(DynamicGroup.ENTITY_FILTER, filter) + .displayName("agents")); + } + + @Override + public void onManagementStarted() { + final byte[] jsonData = ZabbixFeed.JSON_USER_LOGIN + .replace("{{username}}", getConfig(ZABBIX_SERVER_USERNAME)) + .replace("{{password}}", getConfig(ZABBIX_SERVER_PASSWORD)) + .getBytes(); + login = HttpFeed.builder() + .entity(this) + .baseUri(getConfig(ZABBIX_SERVER_API_URL)) + .headers(ImmutableMap.of("Content-Type", "application/json")) + .poll(new HttpPollConfig<String>(ZABBIX_TOKEN) + .method("POST") + .body(jsonData) + .onFailure(Functions.constant("")) + .onSuccess(HttpValueFunctions.jsonContents("result", String.class))) + .build(); + + policy = addPolicy(PolicySpec.create(AgentTrackingPolicy.class) + .displayName("Zabbix Agent Tracker") + .configure("group", monitoredEntities)); + + for (Entity each : monitoredEntities.getMembers()) { + added(each); + } + + setAttribute(Startable.SERVICE_UP, true); + } + + public static class AgentTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override + protected void onEntityChange(Entity member) { + ((ZabbixServerImpl)entity).added(member); } + @Override + protected void onEntityAdded(Entity member) { + } // Ignore + @Override + protected void onEntityRemoved(Entity member) { + ((ZabbixServerImpl)entity).removed(member); + } + } + + public void added(Entity member) { + synchronized (mutex) { + Optional<Location> location = Iterables.tryFind(member.getLocations(), Predicates.instanceOf(SshMachineLocation.class)); + if (location.isPresent() && member.getAttribute(Startable.SERVICE_UP)) { + SshMachineLocation machine = (SshMachineLocation) location.get(); + if (!entityLocations.containsKey(machine)) { + entityLocations.put(machine, member); + // Configure the Zabbix agent + List<String> commands = ImmutableList.<String>builder() + .add("sed -i.bk 's/\\$HOSTNAME/" + machine.getDisplayName() + "/' /etc/zabbix/zabbix_agentd.conf") + .add("zabbix_agentd") + .build(); + int result = machine.execCommands("configuring zabbix_agentd", commands); + if (result == 0) { + log.info("zabbix_agentd configured on {} at {}", member, machine); + } else { + log.warn("failed to configure zabbix_agentd on {}, status {}", machine, result); + } + } + } else { + log.warn("zabbix added({}) called but no location or service not started", member); + } + } + } + + public void removed(Entity member) { + synchronized (mutex) { + for (Location location : member.getLocations()) { + entityLocations.remove(location, member); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/pom.xml ---------------------------------------------------------------------- diff --git a/sandbox/nosql/pom.xml b/sandbox/nosql/pom.xml index af576d0..ccf1487 100644 --- a/sandbox/nosql/pom.xml +++ b/sandbox/nosql/pom.xml @@ -101,7 +101,7 @@ the given components. These are files "without any degree of creativity" from the perspective of the Brooklyn/Apache contribution. --> - <exclude>src/main/resources/brooklyn/entity/nosql/hazelcast/hazelcast-brooklyn.xml</exclude> + <exclude>src/main/resources/org/apache/brooklyn/entity/nosql/hazelcast/hazelcast-brooklyn.xml</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastCluster.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastCluster.java b/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastCluster.java deleted file mode 100644 index c91b78c..0000000 --- a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastCluster.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.hazelcast; - -import java.util.List; - -import com.google.common.reflect.TypeToken; - -import org.apache.brooklyn.catalog.Catalog; -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.group.DynamicCluster; -import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.event.AttributeSensor; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.Sensors; -import brooklyn.util.flags.SetFromFlag; - -/** - * A cluster of {@link HazelcastNode}s based on {@link DynamicCluster}. - */ -@Catalog(name="Hazelcast Cluster", description="Hazelcast is a clustering and highly scalable data distribution platform for Java.") - -@ImplementedBy(HazelcastClusterImpl.class) -public interface HazelcastCluster extends DynamicCluster { - - @SetFromFlag("clusterName") - BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, - "hazelcast.cluster.name", "Name of the Hazelcast cluster", "HazelcastCluster"); - - @SetFromFlag("clusterPassword") - ConfigKey<String> CLUSTER_PASSWORD = - ConfigKeys.newStringConfigKey("hazelcast.cluster.password", "Hazelcast cluster password."); - - @SuppressWarnings("serial") - AttributeSensor<List<String>> PUBLIC_CLUSTER_NODES = Sensors.newSensor(new TypeToken<List<String>>() {}, - "hazelcast.cluster.public.nodes", "List of public addresses of all nodes in the cluster"); - - String getClusterName(); - - String getClusterPassword(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastClusterImpl.java b/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastClusterImpl.java deleted file mode 100644 index e911318..0000000 --- a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastClusterImpl.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.hazelcast; - -import java.util.Collection; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.EntityInternal; -import brooklyn.entity.group.AbstractMembershipTrackingPolicy; -import brooklyn.entity.group.DynamicClusterImpl; -import brooklyn.entity.proxying.EntitySpec; -import brooklyn.location.Location; -import brooklyn.policy.PolicySpec; -import brooklyn.util.text.Strings; - -public class HazelcastClusterImpl extends DynamicClusterImpl implements HazelcastCluster { - private static final Logger LOG = LoggerFactory.getLogger(HazelcastClusterImpl.class); - - private static final AtomicInteger nextMemberId = new AtomicInteger(0); - - @Override - protected EntitySpec<?> getMemberSpec() { - EntitySpec<?> spec = EntitySpec.create(getConfig(MEMBER_SPEC, EntitySpec.create(HazelcastNode.class))); - - spec.configure(HazelcastNode.GROUP_NAME, getConfig(HazelcastClusterImpl.CLUSTER_NAME)); - - if (LOG.isInfoEnabled()) { - LOG.info("Cluster name : {} : used as a group name", getConfig(HazelcastNode.GROUP_NAME)); - } - - spec.configure(HazelcastNode.GROUP_PASSWORD, getClusterPassword()); - - return spec; - } - - @Override - public void init() { - super.init(); - - String clusterPassword = getClusterPassword(); - - if (Strings.isBlank(clusterPassword)) { - if (LOG.isInfoEnabled()) { - LOG.info(this + " cluster password not provided for " + CLUSTER_PASSWORD.getName() + " : generating random password"); - } - setConfig(CLUSTER_PASSWORD, Strings.makeRandomId(12)); - } - - addPolicy(PolicySpec.create(MemberTrackingPolicy.class) - .displayName("Hazelcast members tracker") - .configure("group", this)); - } - - public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { - @Override - protected void onEntityChange(Entity member) { - } - - @Override - protected void onEntityAdded(Entity member) { - if (member.getAttribute(HazelcastNode.NODE_NAME) == null) { - ((EntityInternal) member).setAttribute(HazelcastNode.NODE_NAME, "hazelcast-" + nextMemberId.incrementAndGet()); - if (LOG.isInfoEnabled()) { - LOG.info("Node {} added to the cluster", member); - } - } - } - - @Override - protected void onEntityRemoved(Entity member) { - } - }; - - @Override - public String getClusterName() { - return getConfig(CLUSTER_NAME); - } - - @Override - public String getClusterPassword() { - return getConfig(CLUSTER_PASSWORD); - } - - @Override - protected void initEnrichers() { - super.initEnrichers(); - - } - - @Override - public void start(Collection<? extends Location> locations) { - super.start(locations); - - - List<String> clusterNodes = Lists.newArrayList(); - for (Entity member : getMembers()) { - clusterNodes.add(member.getAttribute(Attributes.ADDRESS)); - } - setAttribute(PUBLIC_CLUSTER_NODES, clusterNodes); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNode.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNode.java b/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNode.java deleted file mode 100644 index 40aa330..0000000 --- a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNode.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.hazelcast; - -import org.apache.brooklyn.catalog.Catalog; -import brooklyn.config.ConfigKey; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.java.UsesJava; -import brooklyn.entity.java.UsesJmx; -import brooklyn.entity.proxying.ImplementedBy; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; -import brooklyn.location.basic.PortRanges; -import brooklyn.util.flags.SetFromFlag; -import brooklyn.util.javalang.JavaClassNames; - -/** - * An {@link brooklyn.entity.Entity} that represents an Hazelcast node - */ -@Catalog(name="Hazelcast Node", description="Hazelcast is a clustering and highly scalable data distribution platform for Java.") - -@ImplementedBy(HazelcastNodeImpl.class) -public interface HazelcastNode extends SoftwareProcess, UsesJava, UsesJmx { - @SetFromFlag("version") - ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "3.4.2"); - - @SetFromFlag("downloadUrl") - BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - SoftwareProcess.DOWNLOAD_URL, "https://repo1.maven.org/maven2/com/hazelcast/hazelcast/${version}/hazelcast-${version}.jar"); - - @SetFromFlag("configTemplateUrl") - ConfigKey<String> CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( - "hazelcast.node.config.templateUrl", "Template file (in freemarker format) for the Hazelcat config file", - JavaClassNames.resolveClasspathUrl(HazelcastNode.class, "hazelcast-brooklyn.xml")); - - @SetFromFlag("configFileName") - ConfigKey<String> CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey( - "hazelcast.node.config.fileName", "Name of the Hazelcast config file", "hazelcast.xml"); - - @SetFromFlag("nodeName") - StringAttributeSensorAndConfigKey NODE_NAME = new StringAttributeSensorAndConfigKey("hazelcast.node.name", - "Node name (or randomly selected if not set", null); - - @SetFromFlag("nodeHeapMemorySize") - ConfigKey<String> NODE_HEAP_MEMORY_SIZE = ConfigKeys.newStringConfigKey( - "hazelcast.node.heap.memory.size", "Node's heap memory size (-Xmx and -Xms) in megabytes. Default: 256m", "256m"); - - @SetFromFlag("nodePort") - PortAttributeSensorAndConfigKey NODE_PORT = new PortAttributeSensorAndConfigKey("hazelcast.node.port", "Hazelcast communication port", PortRanges.fromString("5701+")); - - /** - * Specifies the group name in the configuration file. Each Hazelcast cluster has a separate group. - */ - @SetFromFlag("groupName") - ConfigKey<String> GROUP_NAME = ConfigKeys.newStringConfigKey("hazelcast.group.name", - "Group name", "brooklyn"); - - @SetFromFlag("groupPassword") - ConfigKey<String> GROUP_PASSWORD = ConfigKeys.newStringConfigKey("hazelcast.group.password", - "Group password", "brooklyn"); - - String getNodeName(); - - Integer getNodePort(); - - String getGroupName(); - - String getGroupPassword(); - - String getHostname(); - - String getHostAddress(); - - String getPrivateIpAddress(); - - String getListenAddress(); - - String getHeapMemorySize(); -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeDriver.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeDriver.java b/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeDriver.java deleted file mode 100644 index 4e53add..0000000 --- a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeDriver.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.hazelcast; - -import brooklyn.entity.basic.SoftwareProcessDriver; - -public interface HazelcastNodeDriver extends SoftwareProcessDriver { - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeImpl.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeImpl.java b/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeImpl.java deleted file mode 100644 index 0369934..0000000 --- a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeImpl.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.hazelcast; - -import java.net.URI; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.basic.Attributes; -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.event.feed.http.HttpFeed; -import brooklyn.event.feed.http.HttpPollConfig; -import brooklyn.event.feed.http.HttpValueFunctions; -import brooklyn.location.access.BrooklynAccessUtils; -import brooklyn.util.text.Strings; - -import com.google.common.base.Functions; -import com.google.common.net.HostAndPort; - -public class HazelcastNodeImpl extends SoftwareProcessImpl implements HazelcastNode { - - private static final Logger LOG = LoggerFactory.getLogger(HazelcastNodeImpl.class); - - HttpFeed httpFeed; - - @Override - public Class<HazelcastNodeDriver> getDriverInterface() { - return HazelcastNodeDriver.class; - } - - @Override - protected void connectSensors() { - super.connectSensors(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting sensors for node: {} ", getAttribute(Attributes.HOSTNAME)); - } - - HostAndPort hp = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, getNodePort()); - - String nodeUri = String.format("http://%s:%d/hazelcast/rest/cluster", hp.getHostText(), hp.getPort()); - setAttribute(Attributes.MAIN_URI, URI.create(nodeUri)); - - if (LOG.isDebugEnabled()) { - LOG.debug("Node {} is using {} as a main URI", this, nodeUri); - } - - httpFeed = HttpFeed.builder() - .entity(this) - .period(3000, TimeUnit.MILLISECONDS) - .baseUri(nodeUri) - .poll(new HttpPollConfig<Boolean>(SERVICE_UP) - .onSuccess(HttpValueFunctions.responseCodeEquals(200)) - .onFailureOrException(Functions.constant(false))) - .build(); - } - - @Override - protected void disconnectSensors() { - if (httpFeed != null) { - httpFeed.stop(); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Disconnecting sensors for node: {} ", getAttribute(Attributes.HOSTNAME)); - } - - super.disconnectSensors(); - disconnectServiceUpIsRunning(); - } - - - @Override - public String getGroupName() { - return getConfig(HazelcastNode.GROUP_NAME); - } - - @Override - public String getGroupPassword() { - return getConfig(HazelcastNode.GROUP_PASSWORD); - } - - @Override - public String getNodeName() { - return getAttribute(HazelcastNode.NODE_NAME); - } - - @Override - public Integer getNodePort() { - return getAttribute(HazelcastNode.NODE_PORT); - } - - @Override - public String getHostname() { - return getAttribute(HOSTNAME); - } - - @Override - public String getHostAddress() { - return getAttribute(ADDRESS); - } - - @Override - public String getPrivateIpAddress() { - return getAttribute(SUBNET_ADDRESS); - } - - @Override - public String getListenAddress() { - String listenAddress = getPrivateIpAddress(); - - if (Strings.isBlank(listenAddress)) { - listenAddress = getAttribute(ADDRESS); - } - - if (LOG.isInfoEnabled()) { - LOG.info("Node {} is listening on {}", this, listenAddress); - } - - - return listenAddress; - } - - - @Override - public String getHeapMemorySize() { - return getConfig(HazelcastNode.NODE_HEAP_MEMORY_SIZE); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeSshDriver.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeSshDriver.java b/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeSshDriver.java deleted file mode 100644 index 5527a69..0000000 --- a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/hazelcast/HazelcastNodeSshDriver.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.hazelcast; - -import static java.lang.String.format; - -import java.util.List; -import java.util.concurrent.ExecutionException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.entity.Entity; -import brooklyn.entity.basic.Entities; -import brooklyn.entity.basic.EntityLocal; -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; -import brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.os.Os; -import brooklyn.util.ssh.BashCommands; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -public class HazelcastNodeSshDriver extends JavaSoftwareProcessSshDriver implements HazelcastNodeDriver { - - private static final Logger LOG = LoggerFactory.getLogger(HazelcastNodeSshDriver.class); - - public HazelcastNodeSshDriver(EntityLocal entity, SshMachineLocation machine) { - super(entity, machine); - } - - @Override - public void preInstall() { - resolver = Entities.newDownloader(this); - } - - @Override - public void install() { - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - - List<String> commands = ImmutableList.<String>builder() - .add(BashCommands.installJavaLatestOrWarn()) - .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) - .build(); - - newScript(INSTALLING).body.append(commands).execute(); - } - - @Override - public void customize() { - if (LOG.isInfoEnabled()) { - LOG.info("Customizing {}", entity.getAttribute(HazelcastNode.NODE_NAME)); - } - - ImmutableList.Builder<String> commands = new ImmutableList.Builder<String>() - .add("mkdir -p lib conf log") - .add(String.format("cp %s/%s %s/lib/", getInstallDir(), resolver.getFilename(), getRunDir())); - - newScript(CUSTOMIZING) - .body.append(commands.build()) - .failOnNonZeroResultCode() - .execute(); - - copyTemplate(entity.getConfig(HazelcastNode.CONFIG_TEMPLATE_URL), Os.mergePathsUnix(getRunDir(), "conf", getConfigFileName())); - - } - - @Override - public void launch() { - - entity.setAttribute(HazelcastNode.PID_FILE, Os.mergePathsUnix(getRunDir(), PID_FILENAME)); - - String maxHeapMemorySize = getHeapMemorySize(); - - if (LOG.isInfoEnabled()) { - LOG.info("Launching {} with heap memory of {}", entity, maxHeapMemorySize); - } - - // Setting initial heap size (Xms) size to match max heap size (Xms) at first - String initialHeapMemorySize = maxHeapMemorySize; - - StringBuilder commandBuilder = new StringBuilder() - .append(format("nohup java -cp ./lib/%s", resolver.getFilename())) - .append(format(" -Xmx%s -Xms%s", maxHeapMemorySize, initialHeapMemorySize)) - .append(format(" -Dhazelcast.config=./conf/%s", getConfigFileName())) - .append(format(" com.hazelcast.core.server.StartServer >> %s 2>&1 </dev/null &", getLogFileLocation())); - - newScript(MutableMap.of(USE_PID_FILE, true), LAUNCHING) - .updateTaskAndFailOnNonZeroResultCode() - .body.append(commandBuilder.toString()) - .execute(); - } - - public String getConfigFileName() { - return entity.getConfig(HazelcastNode.CONFIG_FILE_NAME); - } - - public String getHeapMemorySize() { - return entity.getConfig(HazelcastNode.NODE_HEAP_MEMORY_SIZE); - } - - @Override - public boolean isRunning() { - return newScript(MutableMap.of(USE_PID_FILE, true), CHECK_RUNNING).execute() == 0; - } - - @Override - public void stop() { - newScript(MutableMap.of(USE_PID_FILE, true), STOPPING).execute(); - } - - @Override - public void kill() { - newScript(MutableMap.of(USE_PID_FILE, true), KILLING).execute(); - } - - public List<String> getHazelcastNodesList() throws ExecutionException, InterruptedException { - HazelcastCluster cluster = (HazelcastCluster) entity.getParent(); - List<String> result = Lists.newArrayList(); - - for (Entity member : cluster.getMembers()) { - String address = Entities.attributeSupplierWhenReady(member, HazelcastNode.SUBNET_ADDRESS).get(); - Integer port = Entities.attributeSupplierWhenReady(member, HazelcastNode.NODE_PORT).get(); - - String addressAndPort = String.format("%s:%d", address, port); - - if (LOG.isInfoEnabled()) { - LOG.info("Adding {} to the members' list of {}", addressAndPort, entity.getAttribute(HazelcastNode.NODE_NAME)); - } - result.add(addressAndPort); - } - - return result; - } - - @Override - protected String getLogFileLocation() { - return Os.mergePathsUnix(getRunDir(),"/log/out.log"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5Driver.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5Driver.java b/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5Driver.java deleted file mode 100644 index ebd5fb8..0000000 --- a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5Driver.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.infinispan; - -public interface Infinispan5Driver { - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5Server.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5Server.java b/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5Server.java deleted file mode 100644 index e029cf1..0000000 --- a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5Server.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.infinispan; - -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import brooklyn.config.ConfigKey; -import brooklyn.entity.Entity; -import brooklyn.entity.basic.ConfigKeys; -import brooklyn.entity.basic.SoftwareProcess; -import brooklyn.entity.basic.SoftwareProcessImpl; -import brooklyn.entity.java.UsesJmx; -import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; -import brooklyn.event.basic.PortAttributeSensorAndConfigKey; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.flags.SetFromFlag; - -/** - * An {@link brooklyn.entity.Entity} that represents an Infinispan service - */ -public class Infinispan5Server extends SoftwareProcessImpl implements UsesJmx { - private static final Logger log = LoggerFactory.getLogger(Infinispan5Server.class); - - public static final BasicAttributeSensorAndConfigKey<String> PROTOCOL = new BasicAttributeSensorAndConfigKey<String>( - String.class, "infinispan.server.protocol", - "Infinispan protocol (e.g. memcached, hotrod, or websocket)", "memcached"); - - public static final PortAttributeSensorAndConfigKey PORT = new PortAttributeSensorAndConfigKey( - "infinispan.server.port", "TCP port number to listen on"); - - @SetFromFlag("version") - public static final ConfigKey<String> SUGGESTED_VERSION = - ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "5.0.0.CR8"); - - // Default filename is "infinispan-${version}-all.zip" - @SetFromFlag("downloadUrl") - public static final BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( - SoftwareProcess.DOWNLOAD_URL, "http://sourceforge.net/projects/infinispan/files/infinispan/${version}/infinispan-${version}-all.zip/download"); - - public Infinispan5Server() { - this(MutableMap.of(), null); - } - public Infinispan5Server(Map properties) { - this(properties, null); - } - public Infinispan5Server(Entity parent) { - this(MutableMap.of(), parent); - } - public Infinispan5Server(Map properties, Entity parent) { - super(properties, parent); - } - - @Override - public Class getDriverInterface() { - return Infinispan5Driver.class; - } - - @Override - protected void connectSensors() { - super.connectSensors(); - super.connectServiceUpIsRunning(); - } - - @Override - protected void disconnectSensors() { - super.disconnectServiceUpIsRunning(); - super.disconnectSensors(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5SshDriver.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5SshDriver.java b/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5SshDriver.java deleted file mode 100644 index 361a6ab..0000000 --- a/sandbox/nosql/src/main/java/brooklyn/entity/nosql/infinispan/Infinispan5SshDriver.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.entity.nosql.infinispan; - -import static java.lang.String.format; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import brooklyn.entity.java.JavaSoftwareProcessSshDriver; -import brooklyn.location.Location; -import brooklyn.location.basic.SshMachineLocation; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.net.Networking; -import brooklyn.util.ssh.BashCommands; - -import com.google.common.collect.ImmutableList; - -/** - * Start a {@link TomcatServer} in a {@link Location} accessible over ssh. - */ -public class Infinispan5SshDriver extends JavaSoftwareProcessSshDriver implements Infinispan5Driver { - - public Infinispan5SshDriver(Infinispan5Server entity, SshMachineLocation machine) { - super(entity, machine); - } - - @Override - protected String getLogFileLocation() { - throw new UnsupportedOperationException("Work in progress"); - } - - protected String getProtocol() { - return entity.getAttribute(Infinispan5Server.PROTOCOL); - } - - protected Integer getPort() { - return entity.getAttribute(Infinispan5Server.PORT); - } - - @Override - public void install() { - List<String> urls = resolver.getTargets(); - String saveAs = resolver.getFilename(); - - List<String> commands = ImmutableList.<String>builder() - .addAll(BashCommands.commandsToDownloadUrlsAs(urls, saveAs)) - .add(BashCommands.INSTALL_ZIP) - .add("unzip " + saveAs) - .build(); - - newScript(INSTALLING). - failOnNonZeroResultCode(). - body.append(commands).execute(); - } - - @Override - public void customize() { - // TODO create and reference a conf.xml? And start with --cache_config <path> - Map ports = MutableMap.of("port", getPort(), "jmxPort", getJmxPort()); - Networking.checkPortsValid(ports); - - newScript(CUSTOMIZING) - .body.append() - .execute(); - } - - @Override - public void launch() { - // FIXME Do we want to redirect stdout/stderr: >> %s/console 2>&1 </dev/null &", getRunDir()) - newScript(MutableMap.of("usePidFile", true), LAUNCHING). - body.append( - format("%s/bin/startServer.sh --protocol %s " - +(getPort() != null ? " --port %s" : "")+" &", - getExpandedInstallDir(), getProtocol(), getPort())) - .execute(); - } - - - @Override - public boolean isRunning() { - Map flags = MutableMap.of("usePidFile", true); - return newScript(flags, CHECK_RUNNING).execute() == 0; - } - - @Override - public void stop() { - Map flags = MutableMap.of("usePidFile", true); - newScript(flags, STOPPING).execute(); - } - - @Override - public void kill() { - Map flags = MutableMap.of("usePidFile", true); - newScript(flags, KILLING).execute(); - } - - @Override - protected List<String> getCustomJavaConfigOptions() { - List<String> options = new LinkedList<String>(); - options.addAll(super.getCustomJavaConfigOptions()); - options.add("-Xms200m"); - options.add("-Xmx800m"); - options.add("-XX:MaxPermSize=400m"); - return options; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastCluster.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastCluster.java b/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastCluster.java new file mode 100644 index 0000000..ef4894f --- /dev/null +++ b/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastCluster.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.hazelcast; + +import java.util.List; + +import com.google.common.reflect.TypeToken; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.group.DynamicCluster; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.AttributeSensor; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.Sensors; +import brooklyn.util.flags.SetFromFlag; + +/** + * A cluster of {@link HazelcastNode}s based on {@link DynamicCluster}. + */ +@Catalog(name="Hazelcast Cluster", description="Hazelcast is a clustering and highly scalable data distribution platform for Java.") + +@ImplementedBy(HazelcastClusterImpl.class) +public interface HazelcastCluster extends DynamicCluster { + + @SetFromFlag("clusterName") + BasicAttributeSensorAndConfigKey<String> CLUSTER_NAME = new BasicAttributeSensorAndConfigKey<String>(String.class, + "hazelcast.cluster.name", "Name of the Hazelcast cluster", "HazelcastCluster"); + + @SetFromFlag("clusterPassword") + ConfigKey<String> CLUSTER_PASSWORD = + ConfigKeys.newStringConfigKey("hazelcast.cluster.password", "Hazelcast cluster password."); + + @SuppressWarnings("serial") + AttributeSensor<List<String>> PUBLIC_CLUSTER_NODES = Sensors.newSensor(new TypeToken<List<String>>() {}, + "hazelcast.cluster.public.nodes", "List of public addresses of all nodes in the cluster"); + + String getClusterName(); + + String getClusterPassword(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastClusterImpl.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastClusterImpl.java b/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastClusterImpl.java new file mode 100644 index 0000000..4f837d7 --- /dev/null +++ b/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastClusterImpl.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.hazelcast; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import brooklyn.entity.Entity; +import brooklyn.entity.basic.Attributes; +import brooklyn.entity.basic.EntityInternal; +import brooklyn.entity.group.AbstractMembershipTrackingPolicy; +import brooklyn.entity.group.DynamicClusterImpl; +import brooklyn.entity.proxying.EntitySpec; +import brooklyn.location.Location; +import brooklyn.policy.PolicySpec; +import brooklyn.util.text.Strings; + +public class HazelcastClusterImpl extends DynamicClusterImpl implements HazelcastCluster { + private static final Logger LOG = LoggerFactory.getLogger(HazelcastClusterImpl.class); + + private static final AtomicInteger nextMemberId = new AtomicInteger(0); + + @Override + protected EntitySpec<?> getMemberSpec() { + EntitySpec<?> spec = EntitySpec.create(getConfig(MEMBER_SPEC, EntitySpec.create(HazelcastNode.class))); + + spec.configure(HazelcastNode.GROUP_NAME, getConfig(HazelcastClusterImpl.CLUSTER_NAME)); + + if (LOG.isInfoEnabled()) { + LOG.info("Cluster name : {} : used as a group name", getConfig(HazelcastNode.GROUP_NAME)); + } + + spec.configure(HazelcastNode.GROUP_PASSWORD, getClusterPassword()); + + return spec; + } + + @Override + public void init() { + super.init(); + + String clusterPassword = getClusterPassword(); + + if (Strings.isBlank(clusterPassword)) { + if (LOG.isInfoEnabled()) { + LOG.info(this + " cluster password not provided for " + CLUSTER_PASSWORD.getName() + " : generating random password"); + } + setConfig(CLUSTER_PASSWORD, Strings.makeRandomId(12)); + } + + addPolicy(PolicySpec.create(MemberTrackingPolicy.class) + .displayName("Hazelcast members tracker") + .configure("group", this)); + } + + public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy { + @Override + protected void onEntityChange(Entity member) { + } + + @Override + protected void onEntityAdded(Entity member) { + if (member.getAttribute(HazelcastNode.NODE_NAME) == null) { + ((EntityInternal) member).setAttribute(HazelcastNode.NODE_NAME, "hazelcast-" + nextMemberId.incrementAndGet()); + if (LOG.isInfoEnabled()) { + LOG.info("Node {} added to the cluster", member); + } + } + } + + @Override + protected void onEntityRemoved(Entity member) { + } + }; + + @Override + public String getClusterName() { + return getConfig(CLUSTER_NAME); + } + + @Override + public String getClusterPassword() { + return getConfig(CLUSTER_PASSWORD); + } + + @Override + protected void initEnrichers() { + super.initEnrichers(); + + } + + @Override + public void start(Collection<? extends Location> locations) { + super.start(locations); + + + List<String> clusterNodes = Lists.newArrayList(); + for (Entity member : getMembers()) { + clusterNodes.add(member.getAttribute(Attributes.ADDRESS)); + } + setAttribute(PUBLIC_CLUSTER_NODES, clusterNodes); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastNode.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastNode.java b/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastNode.java new file mode 100644 index 0000000..8f3521c --- /dev/null +++ b/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastNode.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.hazelcast; + +import org.apache.brooklyn.catalog.Catalog; +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.entity.basic.SoftwareProcess; +import brooklyn.entity.java.UsesJava; +import brooklyn.entity.java.UsesJmx; +import brooklyn.entity.proxying.ImplementedBy; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey; +import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey; +import brooklyn.event.basic.PortAttributeSensorAndConfigKey; +import brooklyn.location.basic.PortRanges; +import brooklyn.util.flags.SetFromFlag; +import brooklyn.util.javalang.JavaClassNames; + +/** + * An {@link brooklyn.entity.Entity} that represents an Hazelcast node + */ +@Catalog(name="Hazelcast Node", description="Hazelcast is a clustering and highly scalable data distribution platform for Java.") + +@ImplementedBy(HazelcastNodeImpl.class) +public interface HazelcastNode extends SoftwareProcess, UsesJava, UsesJmx { + @SetFromFlag("version") + ConfigKey<String> SUGGESTED_VERSION = ConfigKeys.newConfigKeyWithDefault(SoftwareProcess.SUGGESTED_VERSION, "3.4.2"); + + @SetFromFlag("downloadUrl") + BasicAttributeSensorAndConfigKey<String> DOWNLOAD_URL = new BasicAttributeSensorAndConfigKey<String>( + SoftwareProcess.DOWNLOAD_URL, "https://repo1.maven.org/maven2/com/hazelcast/hazelcast/${version}/hazelcast-${version}.jar"); + + @SetFromFlag("configTemplateUrl") + ConfigKey<String> CONFIG_TEMPLATE_URL = ConfigKeys.newStringConfigKey( + "hazelcast.node.config.templateUrl", "Template file (in freemarker format) for the Hazelcat config file", + JavaClassNames.resolveClasspathUrl(HazelcastNode.class, "hazelcast-brooklyn.xml")); + + @SetFromFlag("configFileName") + ConfigKey<String> CONFIG_FILE_NAME = ConfigKeys.newStringConfigKey( + "hazelcast.node.config.fileName", "Name of the Hazelcast config file", "hazelcast.xml"); + + @SetFromFlag("nodeName") + StringAttributeSensorAndConfigKey NODE_NAME = new StringAttributeSensorAndConfigKey("hazelcast.node.name", + "Node name (or randomly selected if not set", null); + + @SetFromFlag("nodeHeapMemorySize") + ConfigKey<String> NODE_HEAP_MEMORY_SIZE = ConfigKeys.newStringConfigKey( + "hazelcast.node.heap.memory.size", "Node's heap memory size (-Xmx and -Xms) in megabytes. Default: 256m", "256m"); + + @SetFromFlag("nodePort") + PortAttributeSensorAndConfigKey NODE_PORT = new PortAttributeSensorAndConfigKey("hazelcast.node.port", "Hazelcast communication port", PortRanges.fromString("5701+")); + + /** + * Specifies the group name in the configuration file. Each Hazelcast cluster has a separate group. + */ + @SetFromFlag("groupName") + ConfigKey<String> GROUP_NAME = ConfigKeys.newStringConfigKey("hazelcast.group.name", + "Group name", "brooklyn"); + + @SetFromFlag("groupPassword") + ConfigKey<String> GROUP_PASSWORD = ConfigKeys.newStringConfigKey("hazelcast.group.password", + "Group password", "brooklyn"); + + String getNodeName(); + + Integer getNodePort(); + + String getGroupName(); + + String getGroupPassword(); + + String getHostname(); + + String getHostAddress(); + + String getPrivateIpAddress(); + + String getListenAddress(); + + String getHeapMemorySize(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/070b5ca7/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastNodeDriver.java ---------------------------------------------------------------------- diff --git a/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastNodeDriver.java b/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastNodeDriver.java new file mode 100644 index 0000000..e5aa759 --- /dev/null +++ b/sandbox/nosql/src/main/java/org/apache/brooklyn/entity/nosql/hazelcast/HazelcastNodeDriver.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.entity.nosql.hazelcast; + +import brooklyn.entity.basic.SoftwareProcessDriver; + +public interface HazelcastNodeDriver extends SoftwareProcessDriver { + +}
