http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpFeed.java deleted file mode 100644 index 9ae5431..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpFeed.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.http; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.net.URI; -import java.net.URL; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.Entities; -import org.apache.brooklyn.sensor.feed.AbstractFeed; -import org.apache.brooklyn.sensor.feed.AttributePollHandler; -import org.apache.brooklyn.sensor.feed.DelegatingPollHandler; -import org.apache.brooklyn.sensor.feed.Poller; -import org.apache.brooklyn.util.core.http.HttpTool; -import org.apache.brooklyn.util.core.http.HttpToolResponse; -import org.apache.brooklyn.util.core.http.HttpTool.HttpClientBuilder; -import org.apache.brooklyn.util.time.Duration; -import org.apache.http.auth.Credentials; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.HttpClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.base.Optional; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; -import com.google.common.reflect.TypeToken; - -/** - * Provides a feed of attribute values, by polling over http. - * - * Example usage (e.g. in an entity that extends SoftwareProcessImpl): - * <pre> - * {@code - * private HttpFeed feed; - * - * //@Override - * protected void connectSensors() { - * super.connectSensors(); - * - * feed = HttpFeed.builder() - * .entity(this) - * .period(200) - * .baseUri(String.format("http://%s:%s/management/subsystem/web/connector/http/read-resource", host, port)) - * .baseUriVars(ImmutableMap.of("include-runtime","true")) - * .poll(new HttpPollConfig<Boolean>(SERVICE_UP) - * .onSuccess(HttpValueFunctions.responseCodeEquals(200)) - * .onError(Functions.constant(false))) - * .poll(new HttpPollConfig<Integer>(REQUEST_COUNT) - * .onSuccess(HttpValueFunctions.jsonContents("requestCount", Integer.class))) - * .build(); - * } - * - * {@literal @}Override - * protected void disconnectSensors() { - * super.disconnectSensors(); - * if (feed != null) feed.stop(); - * } - * } - * </pre> - * <p> - * - * This also supports giving a Supplier for the URL - * (e.g. {@link Entities#attributeSupplier(org.apache.brooklyn.api.entity.Entity, org.apache.brooklyn.api.event.AttributeSensor)}) - * from a sensor. Note however that if a Supplier-based sensor is *https*, - * https-specific initialization may not occur if the URL is not available at start time, - * and it may report errors if that sensor is not available. - * Some guidance for controlling enablement of a feed based on availability of a sensor - * can be seen in HttpLatencyDetector (in brooklyn-policy). - * - * @author aled - */ -public class HttpFeed extends AbstractFeed { - - public static final Logger log = LoggerFactory.getLogger(HttpFeed.class); - - @SuppressWarnings("serial") - public static final ConfigKey<SetMultimap<HttpPollIdentifier, HttpPollConfig<?>>> POLLS = ConfigKeys.newConfigKey( - new TypeToken<SetMultimap<HttpPollIdentifier, HttpPollConfig<?>>>() {}, - "polls"); - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - private EntityLocal entity; - private boolean onlyIfServiceUp = false; - private Supplier<URI> baseUriProvider; - private Duration period = Duration.millis(500); - private List<HttpPollConfig<?>> polls = Lists.newArrayList(); - private URI baseUri; - private Map<String, String> baseUriVars = Maps.newLinkedHashMap(); - private Map<String, String> headers = Maps.newLinkedHashMap(); - private boolean suspended = false; - private Credentials credentials; - private String uniqueTag; - private volatile boolean built; - - public Builder entity(EntityLocal val) { - this.entity = val; - return this; - } - public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } - public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { - this.onlyIfServiceUp = onlyIfServiceUp; - return this; - } - public Builder baseUri(Supplier<URI> val) { - if (baseUri!=null && val!=null) - throw new IllegalStateException("Builder cannot take both a URI and a URI Provider"); - this.baseUriProvider = val; - return this; - } - public Builder baseUri(URI val) { - if (baseUriProvider!=null && val!=null) - throw new IllegalStateException("Builder cannot take both a URI and a URI Provider"); - this.baseUri = val; - return this; - } - public Builder baseUrl(URL val) { - return baseUri(URI.create(val.toString())); - } - public Builder baseUri(String val) { - return baseUri(URI.create(val)); - } - public Builder baseUriVars(Map<String,String> vals) { - baseUriVars.putAll(vals); - return this; - } - public Builder baseUriVar(String key, String val) { - baseUriVars.put(key, val); - return this; - } - public Builder headers(Map<String,String> vals) { - headers.putAll(vals); - return this; - } - public Builder header(String key, String val) { - headers.put(key, val); - return this; - } - public Builder period(Duration duration) { - this.period = duration; - return this; - } - public Builder period(long millis) { - return period(millis, TimeUnit.MILLISECONDS); - } - public Builder period(long val, TimeUnit units) { - return period(Duration.of(val, units)); - } - public Builder poll(HttpPollConfig<?> config) { - polls.add(config); - return this; - } - public Builder suspended() { - return suspended(true); - } - public Builder suspended(boolean startsSuspended) { - this.suspended = startsSuspended; - return this; - } - public Builder credentials(String username, String password) { - this.credentials = new UsernamePasswordCredentials(username, password); - return this; - } - public Builder credentialsIfNotNull(String username, String password) { - if (username != null && password != null) { - this.credentials = new UsernamePasswordCredentials(username, password); - } - return this; - } - public Builder uniqueTag(String uniqueTag) { - this.uniqueTag = uniqueTag; - return this; - } - public HttpFeed build() { - built = true; - HttpFeed result = new HttpFeed(this); - result.setEntity(checkNotNull(entity, "entity")); - if (suspended) result.suspend(); - result.start(); - return result; - } - @Override - protected void finalize() { - if (!built) log.warn("HttpFeed.Builder created, but build() never called"); - } - } - - private static class HttpPollIdentifier { - final String method; - final Supplier<URI> uriProvider; - final Map<String,String> headers; - final byte[] body; - final Optional<Credentials> credentials; - final Duration connectionTimeout; - final Duration socketTimeout; - private HttpPollIdentifier(String method, Supplier<URI> uriProvider, Map<String, String> headers, byte[] body, - Optional<Credentials> credentials, Duration connectionTimeout, Duration socketTimeout) { - this.method = checkNotNull(method, "method").toLowerCase(); - this.uriProvider = checkNotNull(uriProvider, "uriProvider"); - this.headers = checkNotNull(headers, "headers"); - this.body = body; - this.credentials = checkNotNull(credentials, "credentials"); - this.connectionTimeout = connectionTimeout; - this.socketTimeout = socketTimeout; - - if (!(this.method.equals("get") || this.method.equals("post"))) { - throw new IllegalArgumentException("Unsupported HTTP method (only supports GET and POST): "+method); - } - if (body != null && method.equalsIgnoreCase("get")) { - throw new IllegalArgumentException("Must not set body for http GET method"); - } - } - - @Override - public int hashCode() { - return Objects.hashCode(method, uriProvider, headers, body, credentials); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof HttpPollIdentifier)) { - return false; - } - HttpPollIdentifier o = (HttpPollIdentifier) other; - return Objects.equal(method, o.method) && - Objects.equal(uriProvider, o.uriProvider) && - Objects.equal(headers, o.headers) && - Objects.equal(body, o.body) && - Objects.equal(credentials, o.credentials); - } - } - - /** - * For rebind; do not call directly; use builder - */ - public HttpFeed() { - } - - protected HttpFeed(Builder builder) { - setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp); - Map<String,String> baseHeaders = ImmutableMap.copyOf(checkNotNull(builder.headers, "headers")); - - SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = HashMultimap.<HttpPollIdentifier,HttpPollConfig<?>>create(); - for (HttpPollConfig<?> config : builder.polls) { - if (!config.isEnabled()) continue; - @SuppressWarnings({ "unchecked", "rawtypes" }) - HttpPollConfig<?> configCopy = new HttpPollConfig(config); - if (configCopy.getPeriod() < 0) configCopy.period(builder.period); - String method = config.getMethod(); - Map<String,String> headers = config.buildHeaders(baseHeaders); - byte[] body = config.getBody(); - Duration connectionTimeout = config.getConnectionTimeout(); - Duration socketTimeout = config.getSocketTimeout(); - - Optional<Credentials> credentials = Optional.fromNullable(builder.credentials); - - Supplier<URI> baseUriProvider = builder.baseUriProvider; - if (builder.baseUri!=null) { - if (baseUriProvider!=null) - throw new IllegalStateException("Not permitted to supply baseUri and baseUriProvider"); - Map<String,String> baseUriVars = ImmutableMap.copyOf(checkNotNull(builder.baseUriVars, "baseUriVars")); - URI uri = config.buildUri(builder.baseUri, baseUriVars); - baseUriProvider = Suppliers.ofInstance(uri); - } else if (!builder.baseUriVars.isEmpty()) { - throw new IllegalStateException("Not permitted to supply URI vars when using a URI provider; pass the vars to the provider instead"); - } - checkNotNull(baseUriProvider); - - polls.put(new HttpPollIdentifier(method, baseUriProvider, headers, body, credentials, connectionTimeout, socketTimeout), configCopy); - } - setConfig(POLLS, polls); - initUniqueTag(builder.uniqueTag, polls.values()); - } - - @Override - protected void preStart() { - SetMultimap<HttpPollIdentifier, HttpPollConfig<?>> polls = getConfig(POLLS); - - for (final HttpPollIdentifier pollInfo : polls.keySet()) { - // Though HttpClients are thread safe and can take advantage of connection pooling - // and authentication caching, the httpcomponents documentation says: - // "While HttpClient instances are thread safe and can be shared between multiple - // threads of execution, it is highly recommended that each thread maintains its - // own dedicated instance of HttpContext. - // http://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html - final HttpClient httpClient = createHttpClient(pollInfo); - - Set<HttpPollConfig<?>> configs = polls.get(pollInfo); - long minPeriod = Integer.MAX_VALUE; - Set<AttributePollHandler<? super HttpToolResponse>> handlers = Sets.newLinkedHashSet(); - - for (HttpPollConfig<?> config : configs) { - handlers.add(new AttributePollHandler<HttpToolResponse>(config, entity, this)); - if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); - } - - Callable<HttpToolResponse> pollJob; - - if (pollInfo.method.equals("get")) { - pollJob = new Callable<HttpToolResponse>() { - public HttpToolResponse call() throws Exception { - if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo); - return HttpTool.httpGet(httpClient, pollInfo.uriProvider.get(), pollInfo.headers); - }}; - } else if (pollInfo.method.equals("post")) { - pollJob = new Callable<HttpToolResponse>() { - public HttpToolResponse call() throws Exception { - if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo); - return HttpTool.httpPost(httpClient, pollInfo.uriProvider.get(), pollInfo.headers, pollInfo.body); - }}; - } else if (pollInfo.method.equals("head")) { - pollJob = new Callable<HttpToolResponse>() { - public HttpToolResponse call() throws Exception { - if (log.isTraceEnabled()) log.trace("http polling for {} sensors at {}", entity, pollInfo); - return HttpTool.httpHead(httpClient, pollInfo.uriProvider.get(), pollInfo.headers); - }}; - } else { - throw new IllegalStateException("Unexpected http method: "+pollInfo.method); - } - - getPoller().scheduleAtFixedRate(pollJob, new DelegatingPollHandler<HttpToolResponse>(handlers), minPeriod); - } - } - - // TODO Should we really trustAll for https? Make configurable? - private HttpClient createHttpClient(HttpPollIdentifier pollIdentifier) { - URI uri = pollIdentifier.uriProvider.get(); - HttpClientBuilder builder = HttpTool.httpClientBuilder() - .trustAll() - .laxRedirect(true); - if (uri != null) builder.uri(uri); - if (uri != null) builder.credential(pollIdentifier.credentials); - if (pollIdentifier.connectionTimeout != null) { - builder.connectionTimeout(pollIdentifier.connectionTimeout); - } - if (pollIdentifier.socketTimeout != null) { - builder.socketTimeout(pollIdentifier.socketTimeout); - } - return builder.build(); - } - - @SuppressWarnings("unchecked") - protected Poller<HttpToolResponse> getPoller() { - return (Poller<HttpToolResponse>) super.getPoller(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPollConfig.java deleted file mode 100644 index eead23f..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPollConfig.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.http; - -import java.net.URI; -import java.util.Map; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.sensor.feed.PollConfig; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.collections.MutableMap; -import org.apache.brooklyn.util.core.http.HttpTool; -import org.apache.brooklyn.util.core.http.HttpToolResponse; -import org.apache.brooklyn.util.time.Duration; - -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; - -public class HttpPollConfig<T> extends PollConfig<HttpToolResponse, T, HttpPollConfig<T>> { - - private String method = "GET"; - private String suburl = ""; - private Map<String, String> vars = ImmutableMap.<String,String>of(); - private Map<String, String> headers = ImmutableMap.<String,String>of(); - private byte[] body; - private Duration connectionTimeout; - private Duration socketTimeout; - - public static final Predicate<HttpToolResponse> DEFAULT_SUCCESS = new Predicate<HttpToolResponse>() { - @Override - public boolean apply(@Nullable HttpToolResponse input) { - return input != null && input.getResponseCode() >= 200 && input.getResponseCode() <= 399; - }}; - - public static <T> HttpPollConfig<T> forSensor(AttributeSensor<T> sensor) { - return new HttpPollConfig<T>(sensor); - } - - public static HttpPollConfig<Void> forMultiple() { - return new HttpPollConfig<Void>(PollConfig.NO_SENSOR); - } - - public HttpPollConfig(AttributeSensor<T> sensor) { - super(sensor); - super.checkSuccess(DEFAULT_SUCCESS); - } - - public HttpPollConfig(HttpPollConfig<T> other) { - super(other); - suburl = other.suburl; - vars = other.vars; - method = other.method; - headers = other.headers; - } - - public String getSuburl() { - return suburl; - } - - public Map<String, String> getVars() { - return vars; - } - - public Duration getConnectionTimeout() { - return connectionTimeout; - } - - public Duration getSocketTimeout() { - return socketTimeout; - } - - public String getMethod() { - return method; - } - - public byte[] getBody() { - return body; - } - - public HttpPollConfig<T> method(String val) { - this.method = val; return this; - } - - public HttpPollConfig<T> suburl(String val) { - this.suburl = val; return this; - } - - public HttpPollConfig<T> vars(Map<String,String> val) { - this.vars = val; return this; - } - - public HttpPollConfig<T> headers(Map<String,String> val) { - this.headers = val; return this; - } - - public HttpPollConfig<T> body(byte[] val) { - this.body = val; return this; - } - public HttpPollConfig<T> connectionTimeout(Duration val) { - this.connectionTimeout = val; - return this; - } - public HttpPollConfig<T> socketTimeout(Duration val) { - this.socketTimeout = val; - return this; - } - public URI buildUri(URI baseUri, Map<String,String> baseUriVars) { - String uri = (baseUri != null ? baseUri.toString() : "") + (suburl != null ? suburl : ""); - Map<String,String> allvars = concat(baseUriVars, vars); - - if (allvars != null && allvars.size() > 0) { - uri += "?" + HttpTool.encodeUrlParams(allvars); - } - - return URI.create(uri); - } - - public Map<String, String> buildHeaders(Map<String, String> baseHeaders) { - return MutableMap.<String,String>builder() - .putAll(baseHeaders) - .putAll(headers) - .build(); - } - - @SuppressWarnings("unchecked") - private <K,V> Map<K,V> concat(Map<? extends K,? extends V> map1, Map<? extends K,? extends V> map2) { - if (map1 == null || map1.isEmpty()) return (Map<K,V>) map2; - if (map2 == null || map2.isEmpty()) return (Map<K,V>) map1; - - // TODO Not using Immutable builder, because that fails if duplicates in map1 and map2 - return MutableMap.<K,V>builder().putAll(map1).putAll(map2).build(); - } - - @Override protected String toStringBaseName() { return "http"; } - @Override protected String toStringPollSource() { return suburl; } - @Override - protected MutableList<Object> equalsFields() { - return super.equalsFields().appendIfNotNull(method).appendIfNotNull(vars).appendIfNotNull(headers) - .appendIfNotNull(body).appendIfNotNull(connectionTimeout).appendIfNotNull(socketTimeout); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPollValue.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPollValue.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPollValue.java deleted file mode 100644 index 04432bd..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPollValue.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.http; - -import java.util.List; -import java.util.Map; - -import org.apache.brooklyn.util.core.http.HttpToolResponse; - -/** @deprecated since 0.7.0, use {@link HttpToolResponse}. - * the old {@link HttpPollValue} concrete class has been renamed {@link HttpToolResponse} - * because it has nothing specific to polls. this is now just a transitional interface. */ -@Deprecated -public interface HttpPollValue { - - public int getResponseCode(); - public String getReasonPhrase(); - public long getStartTime(); - public long getLatencyFullContent(); - public long getLatencyFirstResponse(); - public Map<String, List<String>> getHeaderLists(); - public byte[] getContent(); - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPolls.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPolls.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPolls.java deleted file mode 100644 index aacd186..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpPolls.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.http; - -import java.net.URI; - -import org.apache.brooklyn.util.core.http.HttpTool; -import org.apache.brooklyn.util.core.http.HttpToolResponse; -import org.apache.http.impl.client.DefaultHttpClient; - -import com.google.common.collect.ImmutableMap; - -/** - * @deprecated since 0.7; use {@link HttpTool} - */ -@Deprecated -public class HttpPolls { - - public static HttpToolResponse executeSimpleGet(URI uri) { - return HttpTool.httpGet(new DefaultHttpClient(), uri, ImmutableMap.<String,String>of()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpValueFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpValueFunctions.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpValueFunctions.java deleted file mode 100644 index 3fcbe07..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/HttpValueFunctions.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.http; - -import java.util.List; - -import org.apache.brooklyn.util.core.http.HttpToolResponse; -import org.apache.brooklyn.util.guava.Functionals; - -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Predicates; -import com.google.common.collect.Lists; -import com.google.gson.JsonElement; - -public class HttpValueFunctions { - - private HttpValueFunctions() {} // instead use static utility methods - - public static Function<HttpToolResponse, Integer> responseCode() { - return new ResponseCode(); - } - - /** @deprecated since 0.7.0; only here for deserialization of persisted state */ - private static Function<HttpToolResponse, Integer> responseCodeLegacy() { - return new Function<HttpToolResponse, Integer>() { - @Override public Integer apply(HttpToolResponse input) { - return input.getResponseCode(); - } - }; - } - - private static class ResponseCode implements Function<HttpToolResponse, Integer> { - @Override public Integer apply(HttpToolResponse input) { - return input.getResponseCode(); - } - } - - public static Function<HttpToolResponse, Boolean> responseCodeEquals(final int expected) { - return Functionals.chain(HttpValueFunctions.responseCode(), Functions.forPredicate(Predicates.equalTo(expected))); - } - - public static Function<HttpToolResponse, Boolean> responseCodeEquals(final int... expected) { - List<Integer> expectedList = Lists.newArrayList(); - for (int e : expected) { - expectedList.add((Integer)e); - } - return Functionals.chain(HttpValueFunctions.responseCode(), Functions.forPredicate(Predicates.in(expectedList))); - } - - public static Function<HttpToolResponse, String> stringContentsFunction() { - return new StringContents(); - } - - /** @deprecated since 0.7.0; only here for deserialization of persisted state */ - private static Function<HttpToolResponse, String> stringContentsFunctionLegacy() { - return new Function<HttpToolResponse, String>() { - @Override public String apply(HttpToolResponse input) { - return input.getContentAsString(); - } - }; - } - - private static class StringContents implements Function<HttpToolResponse, String> { - @Override public String apply(HttpToolResponse input) { - return input.getContentAsString(); - } - } - - public static Function<HttpToolResponse, JsonElement> jsonContents() { - return Functionals.chain(stringContentsFunction(), JsonFunctions.asJson()); - } - - public static <T> Function<HttpToolResponse, T> jsonContents(String element, Class<T> expected) { - return jsonContents(new String[] {element}, expected); - } - - public static <T> Function<HttpToolResponse, T> jsonContents(String[] elements, Class<T> expected) { - return Functionals.chain(jsonContents(), JsonFunctions.walk(elements), JsonFunctions.cast(expected)); - } - - public static <T> Function<HttpToolResponse, T> jsonContentsFromPath(String path){ - return Functionals.chain(jsonContents(), JsonFunctions.<T>getPath(path)); - } - - public static Function<HttpToolResponse, Long> latency() { - return new Latency(); - } - - /** @deprecated since 0.7.0; only here for deserialization of persisted state */ - private static Function<HttpToolResponse, Long> latencyLegacy() { - return new Function<HttpToolResponse, Long>() { - public Long apply(HttpToolResponse input) { - return input.getLatencyFullContent(); - } - }; - } - - private static class Latency implements Function<HttpToolResponse, Long> { - public Long apply(HttpToolResponse input) { - return input.getLatencyFullContent(); - } - }; - - public static Function<HttpToolResponse, Boolean> containsHeader(String header) { - return new ContainsHeader(header); - } - - private static class ContainsHeader implements Function<HttpToolResponse, Boolean> { - private final String header; - - public ContainsHeader(String header) { - this.header = header; - } - @Override - public Boolean apply(HttpToolResponse input) { - List<String> actual = input.getHeaderLists().get(header); - return actual != null && actual.size() > 0; - } - } - - - /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function)} */ @Deprecated - public static <A,B,C> Function<A,C> chain(final Function<A,? extends B> f1, final Function<B,C> f2) { - return Functionals.chain(f1, f2); - } - - /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function, Function)} */ @Deprecated - public static <A,B,C,D> Function<A,D> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,D> f3) { - return Functionals.chain(f1, f2, f3); - } - - /** @deprecated since 0.7.0 use {@link Functionals#chain(Function, Function, Function, Function)} */ @Deprecated - public static <A,B,C,D,E> Function<A,E> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,? extends D> f3, final Function<D,E> f4) { - return Functionals.chain(f1, f2, f3, f4); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/http/JsonFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/JsonFunctions.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/http/JsonFunctions.java deleted file mode 100644 index f84109f..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/http/JsonFunctions.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.http; - -import java.lang.reflect.Array; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.Arrays; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.util.guava.Functionals; -import org.apache.brooklyn.util.guava.Maybe; -import org.apache.brooklyn.util.guava.MaybeFunctions; - -import com.google.common.base.Function; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.gson.*; -import com.jayway.jsonpath.JsonPath; - -public class JsonFunctions { - - private JsonFunctions() {} // instead use static utility methods - - public static Function<String, JsonElement> asJson() { - return new Function<String, JsonElement>() { - @Override public JsonElement apply(String input) { - return new JsonParser().parse(input); - } - }; - } - - public static <T> Function<JsonElement, List<T>> forEach(final Function<JsonElement, T> func) { - return new Function<JsonElement, List<T>>() { - @Override public List<T> apply(JsonElement input) { - JsonArray array = (JsonArray) input; - List<T> result = Lists.newArrayList(); - for (int i = 0; i < array.size(); i++) { - result.add(func.apply(array.get(i))); - } - return result; - } - }; - } - - - /** as {@link #walkM(Iterable)} taking a single string consisting of a dot separated path */ - public static Function<JsonElement, JsonElement> walk(String elementOrDotSeparatedElements) { - return walk( Splitter.on('.').split(elementOrDotSeparatedElements) ); - } - - /** as {@link #walkM(Iterable)} taking a series of strings (dot separators not respected here) */ - public static Function<JsonElement, JsonElement> walk(final String... elements) { - return walk(Arrays.asList(elements)); - } - - /** returns a function which traverses the supplied path of entries in a json object (maps of maps of maps...), - * @throws NoSuchElementException if any path is not present as a key in that map */ - public static Function<JsonElement, JsonElement> walk(final Iterable<String> elements) { - // could do this instead, pointing at Maybe for this, and for walkN, but it's slightly less efficient -// return Functionals.chain(MaybeFunctions.<JsonElement>wrap(), walkM(elements), MaybeFunctions.<JsonElement>get()); - - return new Function<JsonElement, JsonElement>() { - @Override public JsonElement apply(JsonElement input) { - JsonElement curr = input; - for (String element : elements) { - JsonObject jo = curr.getAsJsonObject(); - curr = jo.get(element); - if (curr==null) - throw new NoSuchElementException("No element '"+element+" in JSON, when walking "+elements); - } - return curr; - } - }; - } - - - /** as {@link #walk(String)} but if any element is not found it simply returns null */ - public static Function<JsonElement, JsonElement> walkN(@Nullable String elements) { - return walkN( Splitter.on('.').split(elements) ); - } - - /** as {@link #walk(String...))} but if any element is not found it simply returns null */ - public static Function<JsonElement, JsonElement> walkN(final String... elements) { - return walkN(Arrays.asList(elements)); - } - - /** as {@link #walk(Iterable))} but if any element is not found it simply returns null */ - public static Function<JsonElement, JsonElement> walkN(final Iterable<String> elements) { - return new Function<JsonElement, JsonElement>() { - @Override public JsonElement apply(JsonElement input) { - JsonElement curr = input; - for (String element : elements) { - if (curr==null) return null; - JsonObject jo = curr.getAsJsonObject(); - curr = jo.get(element); - } - return curr; - } - }; - } - - /** as {@link #walk(String))} and {@link #walk(Iterable)} */ - public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(@Nullable String elements) { - return walkM( Splitter.on('.').split(elements) ); - } - - /** as {@link #walk(String...))} and {@link #walk(Iterable)} */ - public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(final String... elements) { - return walkM(Arrays.asList(elements)); - } - - /** as {@link #walk(Iterable))} but working with objects which {@link Maybe} contain {@link JsonElement}, - * simply preserving a {@link Maybe#absent()} object if additional walks are requested upon it - * (cf jquery) */ - public static Function<Maybe<JsonElement>, Maybe<JsonElement>> walkM(final Iterable<String> elements) { - return new Function<Maybe<JsonElement>, Maybe<JsonElement>>() { - @Override public Maybe<JsonElement> apply(Maybe<JsonElement> input) { - Maybe<JsonElement> curr = input; - for (String element : elements) { - if (curr.isAbsent()) return curr; - JsonObject jo = curr.get().getAsJsonObject(); - JsonElement currO = jo.get(element); - if (currO==null) return Maybe.absent("No element '"+element+" in JSON, when walking "+elements); - curr = Maybe.of(currO); - } - return curr; - } - }; - } - - /** - * returns an element from a single json primitive value given a full path {@link com.jayway.jsonpath.JsonPath} - */ - public static <T> Function<JsonElement,T> getPath(final String path) { - return new Function<JsonElement, T>() { - @SuppressWarnings("unchecked") - @Override public T apply(JsonElement input) { - String jsonString = input.toString(); - Object rawElement = JsonPath.read(jsonString, path); - return (T) rawElement; - } - }; - } - - @SuppressWarnings("unchecked") - public static <T> Function<JsonElement, T> cast(final Class<T> expected) { - return new Function<JsonElement, T>() { - @Override public T apply(JsonElement input) { - if (input == null) { - return (T) null; - } else if (input.isJsonNull()) { - return (T) null; - } else if (expected == boolean.class || expected == Boolean.class) { - return (T) (Boolean) input.getAsBoolean(); - } else if (expected == char.class || expected == Character.class) { - return (T) (Character) input.getAsCharacter(); - } else if (expected == byte.class || expected == Byte.class) { - return (T) (Byte) input.getAsByte(); - } else if (expected == short.class || expected == Short.class) { - return (T) (Short) input.getAsShort(); - } else if (expected == int.class || expected == Integer.class) { - return (T) (Integer) input.getAsInt(); - } else if (expected == long.class || expected == Long.class) { - return (T) (Long) input.getAsLong(); - } else if (expected == float.class || expected == Float.class) { - return (T) (Float) input.getAsFloat(); - } else if (expected == double.class || expected == Double.class) { - return (T) (Double) input.getAsDouble(); - } else if (expected == BigDecimal.class) { - return (T) input.getAsBigDecimal(); - } else if (expected == BigInteger.class) { - return (T) input.getAsBigInteger(); - } else if (Number.class.isAssignableFrom(expected)) { - // TODO Will result in a class-cast if it's an unexpected sub-type of Number not handled above - return (T) input.getAsNumber(); - } else if (expected == String.class) { - return (T) input.getAsString(); - } else if (expected.isArray()) { - JsonArray array = input.getAsJsonArray(); - Class<?> componentType = expected.getComponentType(); - if (JsonElement.class.isAssignableFrom(componentType)) { - JsonElement[] result = new JsonElement[array.size()]; - for (int i = 0; i < array.size(); i++) { - result[i] = array.get(i); - } - return (T) result; - } else { - Object[] result = (Object[]) Array.newInstance(componentType, array.size()); - for (int i = 0; i < array.size(); i++) { - result[i] = cast(componentType).apply(array.get(i)); - } - return (T) result; - } - } else { - throw new IllegalArgumentException("Cannot cast json element to type "+expected); - } - } - }; - } - - public static <T> Function<Maybe<JsonElement>, T> castM(final Class<T> expected) { - return Functionals.chain(MaybeFunctions.<JsonElement>get(), cast(expected)); - } - - public static <T> Function<Maybe<JsonElement>, T> castM(final Class<T> expected, final T defaultValue) { - return new Function<Maybe<JsonElement>, T>() { - @Override - public T apply(Maybe<JsonElement> input) { - if (input.isAbsent()) return defaultValue; - return cast(expected).apply(input.get()); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/shell/ShellFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/shell/ShellFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/shell/ShellFeed.java deleted file mode 100644 index 19e5bf5..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/shell/ShellFeed.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.shell; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.File; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.api.mgmt.ExecutionContext; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.entity.EntityInternal; -import org.apache.brooklyn.sensor.feed.AbstractFeed; -import org.apache.brooklyn.sensor.feed.AttributePollHandler; -import org.apache.brooklyn.sensor.feed.DelegatingPollHandler; -import org.apache.brooklyn.sensor.feed.Poller; -import org.apache.brooklyn.sensor.feed.function.FunctionFeed; -import org.apache.brooklyn.sensor.feed.ssh.SshFeed; -import org.apache.brooklyn.sensor.feed.ssh.SshPollValue; -import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory; -import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper; -import org.apache.brooklyn.util.core.task.system.internal.SystemProcessTaskFactory.ConcreteSystemProcessTaskFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Objects; -import com.google.common.base.Optional; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; -import com.google.common.reflect.TypeToken; - -/** - * Provides a feed of attribute values, by executing shell commands (on the local machine where - * this instance of brooklyn is running). Useful e.g. for paas tools such as Cloud Foundry vmc - * which operate against a remote target. - * - * Example usage (e.g. in an entity that extends SoftwareProcessImpl): - * <pre> - * {@code - * private ShellFeed feed; - * - * //@Override - * protected void connectSensors() { - * super.connectSensors(); - * - * feed = ShellFeed.builder() - * .entity(this) - * .machine(mySshMachineLachine) - * .poll(new ShellPollConfig<Long>(DISK_USAGE) - * .command("df -P | grep /dev") - * .failOnNonZeroResultCode(true) - * .onSuccess(new Function<SshPollValue, Long>() { - * public Long apply(SshPollValue input) { - * String[] parts = input.getStdout().split("[ \\t]+"); - * return Long.parseLong(parts[2]); - * }})) - * .build(); - * } - * - * {@literal @}Override - * protected void disconnectSensors() { - * super.disconnectSensors(); - * if (feed != null) feed.stop(); - * } - * } - * </pre> - * - * @see SshFeed (to run on remote machines) - * @see FunctionFeed (for arbitrary functions) - * - * @author aled - */ -public class ShellFeed extends AbstractFeed { - - public static final Logger log = LoggerFactory.getLogger(ShellFeed.class); - - @SuppressWarnings("serial") - private static final ConfigKey<SetMultimap<ShellPollIdentifier, ShellPollConfig<?>>> POLLS = ConfigKeys.newConfigKey( - new TypeToken<SetMultimap<ShellPollIdentifier, ShellPollConfig<?>>>() {}, - "polls"); - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - private EntityLocal entity; - private long period = 500; - private TimeUnit periodUnits = TimeUnit.MILLISECONDS; - private List<ShellPollConfig<?>> polls = Lists.newArrayList(); - private String uniqueTag; - private volatile boolean built; - - public Builder entity(EntityLocal val) { - this.entity = val; - return this; - } - public Builder period(long millis) { - return period(millis, TimeUnit.MILLISECONDS); - } - public Builder period(long val, TimeUnit units) { - this.period = val; - this.periodUnits = units; - return this; - } - public Builder poll(ShellPollConfig<?> config) { - polls.add(config); - return this; - } - public Builder uniqueTag(String uniqueTag) { - this.uniqueTag = uniqueTag; - return this; - } - public ShellFeed build() { - built = true; - ShellFeed result = new ShellFeed(this); - result.setEntity(checkNotNull(entity, "entity")); - result.start(); - return result; - } - @Override - protected void finalize() { - if (!built) log.warn("ShellFeed.Builder created, but build() never called"); - } - } - - private static class ShellPollIdentifier { - final String command; - final Map<String, String> env; - final File dir; - final String input; - final String context; - final long timeout; - - private ShellPollIdentifier(String command, Map<String, String> env, File dir, String input, String context, long timeout) { - this.command = checkNotNull(command, "command"); - this.env = checkNotNull(env, "env"); - this.dir = dir; - this.input = input; - this.context = checkNotNull(context, "context"); - this.timeout = timeout; - } - - @Override - public int hashCode() { - return Objects.hashCode(command, env, dir, input, timeout); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof ShellPollIdentifier)) { - return false; - } - ShellPollIdentifier o = (ShellPollIdentifier) other; - return Objects.equal(command, o.command) && - Objects.equal(env, o.env) && - Objects.equal(dir, o.dir) && - Objects.equal(input, o.input) && - Objects.equal(timeout, o.timeout); - } - } - - /** - * For rebind; do not call directly; use builder - */ - public ShellFeed() { - } - - protected ShellFeed(Builder builder) { - super(); - - SetMultimap<ShellPollIdentifier, ShellPollConfig<?>> polls = HashMultimap.<ShellPollIdentifier,ShellPollConfig<?>>create(); - for (ShellPollConfig<?> config : builder.polls) { - if (!config.isEnabled()) continue; - @SuppressWarnings({ "unchecked", "rawtypes" }) - ShellPollConfig<?> configCopy = new ShellPollConfig(config); - if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits); - String command = config.getCommand(); - Map<String, String> env = config.getEnv(); - File dir = config.getDir(); - String input = config.getInput(); - String context = config.getSensor().getName(); - long timeout = config.getTimeout(); - - polls.put(new ShellPollIdentifier(command, env, dir, input, context, timeout), configCopy); - } - setConfig(POLLS, polls); - initUniqueTag(builder.uniqueTag, polls.values()); - } - - @Override - protected void preStart() { - SetMultimap<ShellPollIdentifier, ShellPollConfig<?>> polls = getConfig(POLLS); - - for (final ShellPollIdentifier pollInfo : polls.keySet()) { - Set<ShellPollConfig<?>> configs = polls.get(pollInfo); - long minPeriod = Integer.MAX_VALUE; - Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet(); - - for (ShellPollConfig<?> config : configs) { - handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this)); - if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); - } - - final ProcessTaskFactory<?> taskFactory = newTaskFactory(pollInfo.command, pollInfo.env, pollInfo.dir, - pollInfo.input, pollInfo.context, pollInfo.timeout); - final ExecutionContext executionContext = ((EntityInternal) entity).getManagementSupport().getExecutionContext(); - - getPoller().scheduleAtFixedRate( - new Callable<SshPollValue>() { - @Override public SshPollValue call() throws Exception { - ProcessTaskWrapper<?> taskWrapper = taskFactory.newTask(); - executionContext.submit(taskWrapper); - taskWrapper.block(); - Optional<Integer> exitCode = Optional.fromNullable(taskWrapper.getExitCode()); - return new SshPollValue(null, exitCode.or(-1), taskWrapper.getStdout(), taskWrapper.getStderr()); - }}, - new DelegatingPollHandler<SshPollValue>(handlers), - minPeriod); - } - } - - @SuppressWarnings("unchecked") - protected Poller<SshPollValue> getPoller() { - return (Poller<SshPollValue>) super.getPoller(); - } - - /** - * Executes the given command (using `bash -l -c $command`, so as to have a good path set). - * - * @param command The command to execute - * @param env Environment variable settings, in format name=value - * @param dir Working directory, or null to inherit from current process - * @param input Input to send to the command (if not null) - */ - protected ProcessTaskFactory<?> newTaskFactory(final String command, Map<String,String> env, File dir, String input, final String summary, final long timeout) { - // FIXME Add generic timeout() support to task ExecutionManager - if (timeout > 0) { - log.warn("Timeout ({}ms) not currently supported for ShellFeed {}", timeout, this); - } - - return new ConcreteSystemProcessTaskFactory<Object>(command) - .environmentVariables(env) - .loginShell(true) - .directory(dir) - .runAsCommand() - .summary(summary); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/shell/ShellPollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/shell/ShellPollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/shell/ShellPollConfig.java deleted file mode 100644 index 782b094..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/shell/ShellPollConfig.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.shell; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.File; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.sensor.feed.PollConfig; -import org.apache.brooklyn.sensor.feed.ssh.SshPollValue; -import org.apache.brooklyn.util.collections.MutableList; - -import com.google.common.base.Predicate; -import com.google.common.collect.Maps; - -public class ShellPollConfig<T> extends PollConfig<SshPollValue, T, ShellPollConfig<T>> { - - private String command; - private Map<String,String> env = Maps.newLinkedHashMap(); - private long timeout = -1; - private File dir; - private String input; - - public static final Predicate<SshPollValue> DEFAULT_SUCCESS = new Predicate<SshPollValue>() { - @Override - public boolean apply(@Nullable SshPollValue input) { - return input != null && input.getExitStatus() == 0; - }}; - - public ShellPollConfig(AttributeSensor<T> sensor) { - super(sensor); - super.checkSuccess(DEFAULT_SUCCESS); - } - - public ShellPollConfig(ShellPollConfig<T> other) { - super(other); - command = other.command; - env = other.env; - timeout = other.timeout; - dir = other.dir; - input = other.input; - } - - public String getCommand() { - return command; - } - - public Map<String, String> getEnv() { - return env; - } - - public File getDir() { - return dir; - } - - public String getInput() { - return input; - } - - public long getTimeout() { - return timeout; - } - - public ShellPollConfig<T> command(String val) { - this.command = val; - return this; - } - - public ShellPollConfig<T> env(String key, String val) { - env.put(checkNotNull(key, "key"), checkNotNull(val, "val")); - return this; - } - - public ShellPollConfig<T> env(Map<String,String> val) { - for (Map.Entry<String, String> entry : checkNotNull(val, "map").entrySet()) { - env(entry.getKey(), entry.getValue()); - } - return this; - } - - public ShellPollConfig<T> dir(File val) { - this.dir = val; - return this; - } - - public ShellPollConfig<T> input(String val) { - this.input = val; - return this; - } - - public ShellPollConfig<T> timeout(long timeout) { - return timeout(timeout, TimeUnit.MILLISECONDS); - } - - public ShellPollConfig<T> timeout(long timeout, TimeUnit units) { - this.timeout = units.toMillis(timeout); - return this; - } - - @Override protected String toStringBaseName() { return "shell"; } - @Override protected String toStringPollSource() { return command; } - @Override protected MutableList<Object> equalsFields() { return super.equalsFields().appendIfNotNull(command); } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshFeed.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshFeed.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshFeed.java deleted file mode 100644 index 6e5a485..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshFeed.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.ssh; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.entity.EntityLocal; -import org.apache.brooklyn.config.ConfigKey; -import org.apache.brooklyn.core.config.ConfigKeys; -import org.apache.brooklyn.core.location.Locations; -import org.apache.brooklyn.core.location.Machines; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.brooklyn.location.ssh.SshMachineLocation; -import org.apache.brooklyn.sensor.feed.AbstractFeed; -import org.apache.brooklyn.sensor.feed.AttributePollHandler; -import org.apache.brooklyn.sensor.feed.DelegatingPollHandler; -import org.apache.brooklyn.sensor.feed.Poller; -import org.apache.brooklyn.util.core.config.ConfigBag; -import org.apache.brooklyn.util.core.internal.ssh.SshTool; -import org.apache.brooklyn.util.time.Duration; - -import com.google.common.base.Objects; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; -import com.google.common.reflect.TypeToken; - -/** - * Provides a feed of attribute values, by polling over ssh. - * - * Example usage (e.g. in an entity that extends SoftwareProcessImpl): - * <pre> - * {@code - * private SshFeed feed; - * - * //@Override - * protected void connectSensors() { - * super.connectSensors(); - * - * feed = SshFeed.builder() - * .entity(this) - * .machine(mySshMachineLachine) - * .poll(new SshPollConfig<Boolean>(SERVICE_UP) - * .command("rabbitmqctl -q status") - * .onSuccess(new Function<SshPollValue, Boolean>() { - * public Boolean apply(SshPollValue input) { - * return (input.getExitStatus() == 0); - * }})) - * .build(); - * } - * - * {@literal @}Override - * protected void disconnectSensors() { - * super.disconnectSensors(); - * if (feed != null) feed.stop(); - * } - * } - * </pre> - * - * @author aled - */ -public class SshFeed extends AbstractFeed { - - public static final Logger log = LoggerFactory.getLogger(SshFeed.class); - - @SuppressWarnings("serial") - public static final ConfigKey<Supplier<SshMachineLocation>> MACHINE = ConfigKeys.newConfigKey( - new TypeToken<Supplier<SshMachineLocation>>() {}, - "machine"); - - public static final ConfigKey<Boolean> EXEC_AS_COMMAND = ConfigKeys.newBooleanConfigKey("execAsCommand"); - - @SuppressWarnings("serial") - public static final ConfigKey<SetMultimap<SshPollIdentifier, SshPollConfig<?>>> POLLS = ConfigKeys.newConfigKey( - new TypeToken<SetMultimap<SshPollIdentifier, SshPollConfig<?>>>() {}, - "polls"); - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - private EntityLocal entity; - private boolean onlyIfServiceUp = false; - private Supplier<SshMachineLocation> machine; - private Duration period = Duration.of(500, TimeUnit.MILLISECONDS); - private List<SshPollConfig<?>> polls = Lists.newArrayList(); - private boolean execAsCommand = false; - private String uniqueTag; - private volatile boolean built; - - public Builder entity(EntityLocal val) { - this.entity = val; - return this; - } - public Builder onlyIfServiceUp() { return onlyIfServiceUp(true); } - public Builder onlyIfServiceUp(boolean onlyIfServiceUp) { - this.onlyIfServiceUp = onlyIfServiceUp; - return this; - } - /** optional, to force a machine; otherwise it is inferred from the entity */ - public Builder machine(SshMachineLocation val) { return machine(Suppliers.ofInstance(val)); } - /** optional, to force a machine; otherwise it is inferred from the entity */ - public Builder machine(Supplier<SshMachineLocation> val) { - this.machine = val; - return this; - } - public Builder period(Duration period) { - this.period = period; - return this; - } - public Builder period(long millis) { - return period(Duration.of(millis, TimeUnit.MILLISECONDS)); - } - public Builder period(long val, TimeUnit units) { - return period(Duration.of(val, units)); - } - public Builder poll(SshPollConfig<?> config) { - polls.add(config); - return this; - } - public Builder execAsCommand() { - execAsCommand = true; - return this; - } - public Builder execAsScript() { - execAsCommand = false; - return this; - } - public Builder uniqueTag(String uniqueTag) { - this.uniqueTag = uniqueTag; - return this; - } - public SshFeed build() { - built = true; - SshFeed result = new SshFeed(this); - result.setEntity(checkNotNull(entity, "entity")); - result.start(); - return result; - } - @Override - protected void finalize() { - if (!built) log.warn("SshFeed.Builder created, but build() never called"); - } - } - - private static class SshPollIdentifier { - final Supplier<String> command; - final Supplier<Map<String, String>> env; - - private SshPollIdentifier(Supplier<String> command, Supplier<Map<String, String>> env) { - this.command = checkNotNull(command, "command"); - this.env = checkNotNull(env, "env"); - } - - @Override - public int hashCode() { - return Objects.hashCode(command, env); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof SshPollIdentifier)) { - return false; - } - SshPollIdentifier o = (SshPollIdentifier) other; - return Objects.equal(command, o.command) && - Objects.equal(env, o.env); - } - } - - /** @deprecated since 0.7.0, use static convenience on {@link Locations} */ - @Deprecated - public static SshMachineLocation getMachineOfEntity(Entity entity) { - return Machines.findUniqueSshMachineLocation(entity.getLocations()).orNull(); - } - - /** - * For rebind; do not call directly; use builder - */ - public SshFeed() { - } - - protected SshFeed(final Builder builder) { - setConfig(ONLY_IF_SERVICE_UP, builder.onlyIfServiceUp); - setConfig(MACHINE, builder.machine != null ? builder.machine : null); - setConfig(EXEC_AS_COMMAND, builder.execAsCommand); - - SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = HashMultimap.<SshPollIdentifier,SshPollConfig<?>>create(); - for (SshPollConfig<?> config : builder.polls) { - @SuppressWarnings({ "unchecked", "rawtypes" }) - SshPollConfig<?> configCopy = new SshPollConfig(config); - if (configCopy.getPeriod() < 0) configCopy.period(builder.period); - polls.put(new SshPollIdentifier(config.getCommandSupplier(), config.getEnvSupplier()), configCopy); - } - setConfig(POLLS, polls); - initUniqueTag(builder.uniqueTag, polls.values()); - } - - protected SshMachineLocation getMachine() { - Supplier<SshMachineLocation> supplier = getConfig(MACHINE); - if (supplier != null) { - return supplier.get(); - } else { - return Locations.findUniqueSshMachineLocation(entity.getLocations()).get(); - } - } - - @Override - protected void preStart() { - SetMultimap<SshPollIdentifier, SshPollConfig<?>> polls = getConfig(POLLS); - - for (final SshPollIdentifier pollInfo : polls.keySet()) { - Set<SshPollConfig<?>> configs = polls.get(pollInfo); - long minPeriod = Integer.MAX_VALUE; - Set<AttributePollHandler<? super SshPollValue>> handlers = Sets.newLinkedHashSet(); - - for (SshPollConfig<?> config : configs) { - handlers.add(new AttributePollHandler<SshPollValue>(config, entity, this)); - if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod()); - } - - getPoller().scheduleAtFixedRate( - new Callable<SshPollValue>() { - public SshPollValue call() throws Exception { - return exec(pollInfo.command.get(), pollInfo.env.get()); - }}, - new DelegatingPollHandler<SshPollValue>(handlers), - minPeriod); - } - } - - @SuppressWarnings("unchecked") - protected Poller<SshPollValue> getPoller() { - return (Poller<SshPollValue>) super.getPoller(); - } - - private SshPollValue exec(String command, Map<String,String> env) throws IOException { - SshMachineLocation machine = getMachine(); - Boolean execAsCommand = getConfig(EXEC_AS_COMMAND); - if (log.isTraceEnabled()) log.trace("Ssh polling for {}, executing {} with env {}", new Object[] {machine, command, env}); - ByteArrayOutputStream stdout = new ByteArrayOutputStream(); - ByteArrayOutputStream stderr = new ByteArrayOutputStream(); - - int exitStatus; - ConfigBag flags = ConfigBag.newInstance() - .configure(SshTool.PROP_NO_EXTRA_OUTPUT, true) - .configure(SshTool.PROP_OUT_STREAM, stdout) - .configure(SshTool.PROP_ERR_STREAM, stderr); - if (Boolean.TRUE.equals(execAsCommand)) { - exitStatus = machine.execCommands(flags.getAllConfig(), - "ssh-feed", ImmutableList.of(command), env); - } else { - exitStatus = machine.execScript(flags.getAllConfig(), - "ssh-feed", ImmutableList.of(command), env); - } - - return new SshPollValue(machine, exitStatus, new String(stdout.toByteArray()), new String(stderr.toByteArray())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshPollConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshPollConfig.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshPollConfig.java deleted file mode 100644 index b666d42..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshPollConfig.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.ssh; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.sensor.feed.PollConfig; -import org.apache.brooklyn.util.collections.MutableList; -import org.apache.brooklyn.util.collections.MutableMap; - -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; - -public class SshPollConfig<T> extends PollConfig<SshPollValue, T, SshPollConfig<T>> { - - private Supplier<String> commandSupplier; - private List<Supplier<Map<String,String>>> dynamicEnvironmentSupplier = MutableList.of(); - - public static final Predicate<SshPollValue> DEFAULT_SUCCESS = new Predicate<SshPollValue>() { - @Override - public boolean apply(@Nullable SshPollValue input) { - return input != null && input.getExitStatus() == 0; - }}; - - public SshPollConfig(AttributeSensor<T> sensor) { - super(sensor); - super.checkSuccess(DEFAULT_SUCCESS); - } - - public SshPollConfig(SshPollConfig<T> other) { - super(other); - commandSupplier = other.commandSupplier; - } - - /** @deprecated since 0.7.0; use {@link #getCommandSupplier()} and resolve just-in-time */ - public String getCommand() { - return getCommandSupplier().get(); - } - public Supplier<String> getCommandSupplier() { - return commandSupplier; - } - - /** @deprecated since 0.7.0; use {@link #getEnvSupplier()} and resolve just-in-time */ - public Map<String, String> getEnv() { - return getEnvSupplier().get(); - } - public Supplier<Map<String,String>> getEnvSupplier() { - return new Supplier<Map<String,String>>() { - @Override - public Map<String, String> get() { - Map<String,String> result = MutableMap.of(); - for (Supplier<Map<String, String>> envS: dynamicEnvironmentSupplier) { - if (envS!=null) { - Map<String, String> envM = envS.get(); - if (envM!=null) { - mergeEnvMaps(envM, result); - } - } - } - return result; - } - }; - } - - protected void mergeEnvMaps(Map<String,String> supplied, Map<String,String> target) { - if (supplied==null) return; - // as the value is a string there is no need to look at deep merge behaviour - target.putAll(supplied); - } - - public SshPollConfig<T> command(String val) { return command(Suppliers.ofInstance(val)); } - public SshPollConfig<T> command(Supplier<String> val) { - this.commandSupplier = val; - return this; - } - - /** add the given env param; sequence is as per {@link #env(Supplier)} */ - public SshPollConfig<T> env(String key, String val) { - return env(Collections.singletonMap(key, val)); - } - - /** add the given env params; sequence is as per {@link #env(Supplier)}. - * behaviour is undefined if the map supplied here is subsequently changed. - * <p> - * if a map's contents might change, use {@link #env(Supplier)} */ - public SshPollConfig<T> env(Map<String,String> val) { - if (val==null) return this; - return env(Suppliers.ofInstance(val)); - } - - /** - * adds the given dynamic supplier of environment variables. - * <p> - * use of a supplier allows env vars to be computed on each execution, - * for example to take the most recent sensor values. - * <p> - * in the case of multiple map suppliers, static maps, or static {@link #env(String, String)} - * key value pairs, the order in which they are specified here is the order - * in which they are computed and applied. - **/ - public SshPollConfig<T> env(Supplier<Map<String,String>> val) { - Preconditions.checkNotNull(val); - dynamicEnvironmentSupplier.add(val); - return this; - } - - @Override protected String toStringBaseName() { return "ssh"; } - @Override protected Object toStringPollSource() { - if (getCommandSupplier()==null) return null; - String command = getCommandSupplier().get(); - return command; - } - @Override protected MutableList<Object> equalsFields() { - return super.equalsFields() - .appendIfNotNull(getCommandSupplier()!=null ? getCommandSupplier().get() : null) - .appendIfNotNull(getEnvSupplier()!=null ? getEnvSupplier().get() : null); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshPollValue.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshPollValue.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshPollValue.java deleted file mode 100644 index 8f1885d..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshPollValue.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.ssh; - -import javax.annotation.Nullable; - -import org.apache.brooklyn.location.ssh.SshMachineLocation; - -public class SshPollValue { - - private final SshMachineLocation machine; - private final int exitStatus; - private final String stdout; - private final String stderr; - - public SshPollValue(SshMachineLocation machine, int exitStatus, String stdout, String stderr) { - this.machine = machine; - this.exitStatus = exitStatus; - this.stdout = stdout; - this.stderr = stderr; - } - - /** The machine the command will run on. */ - public SshMachineLocation getMachine() { - return machine; - } - - /** Command exit status, or -1 if error is set. */ - public int getExitStatus() { - return exitStatus; - } - - /** Command standard output; may be null if no content available. */ - @Nullable - public String getStdout() { - return stdout; - } - - /** Command standard error; may be null if no content available. */ - @Nullable - public String getStderr() { - return stderr; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/daf40919/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshValueFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshValueFunctions.java b/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshValueFunctions.java deleted file mode 100644 index 9ef3048..0000000 --- a/core/src/main/java/org/apache/brooklyn/sensor/feed/ssh/SshValueFunctions.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.brooklyn.sensor.feed.ssh; - -import javax.annotation.Nullable; - -import com.google.common.base.Function; -import com.google.common.base.Functions; -import com.google.common.base.Predicates; - -public class SshValueFunctions { - - public static Function<SshPollValue, Integer> exitStatus() { - return new Function<SshPollValue, Integer>() { - @Override public Integer apply(SshPollValue input) { - return input.getExitStatus(); - } - }; - } - - public static Function<SshPollValue, String> stdout() { - return new Function<SshPollValue, String>() { - @Override public String apply(SshPollValue input) { - return input.getStdout(); - } - }; - } - - public static Function<SshPollValue, String> stderr() { - return new Function<SshPollValue, String>() { - @Override public String apply(SshPollValue input) { - return input.getStderr(); - } - }; - } - - public static Function<SshPollValue, Boolean> exitStatusEquals(final int expected) { - return chain(SshValueFunctions.exitStatus(), Functions.forPredicate(Predicates.equalTo(expected))); - } - - // TODO Do we want these chain methods? Does guava have them already? Duplicated in HttpValueFunctions. - public static <A,B,C> Function<A,C> chain(final Function<A,? extends B> f1, final Function<B,C> f2) { - return new Function<A,C>() { - @Override public C apply(@Nullable A input) { - return f2.apply(f1.apply(input)); - } - }; - } - - public static <A,B,C,D> Function<A,D> chain(final Function<A,? extends B> f1, final Function<B,? extends C> f2, final Function<C,D> f3) { - return new Function<A,D>() { - @Override public D apply(@Nullable A input) { - return f3.apply(f2.apply(f1.apply(input))); - } - }; - } -}
