http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/http/HttpTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/http/HttpTool.java b/core/src/main/java/org/apache/brooklyn/core/util/http/HttpTool.java new file mode 100644 index 0000000..43b1aee --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/http/HttpTool.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.http; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.net.URI; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.ConnectionReuseStrategy; +import org.apache.http.HttpEntity; +import org.apache.http.HttpRequest; +import org.apache.http.HttpResponse; +import org.apache.http.NameValuePair; +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.Credentials; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.HttpClient; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.conn.ClientConnectionManager; +import org.apache.http.conn.scheme.Scheme; +import org.apache.http.conn.scheme.SchemeSocketFactory; +import org.apache.http.conn.ssl.SSLSocketFactory; +import org.apache.http.conn.ssl.TrustSelfSignedStrategy; +import org.apache.http.conn.ssl.TrustStrategy; +import org.apache.http.conn.ssl.X509HostnameVerifier; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.client.LaxRedirectStrategy; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.params.BasicHttpParams; +import org.apache.http.params.HttpConnectionParams; +import org.apache.http.params.HttpParams; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.util.crypto.SslTrustUtils; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.net.URLParamEncoder; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; + +public class HttpTool { + + private static final Logger LOG = LoggerFactory.getLogger(HttpTool.class); + + /** Apache HTTP commons utility for trusting all. + * <p> + * For generic java HTTP usage, see {@link SslTrustUtils#trustAll(java.net.URLConnection)} + * and static constants in the same class. */ + public static class TrustAllStrategy implements TrustStrategy { + @Override + public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException { + return true; + } + } + + public static HttpClientBuilder httpClientBuilder() { + return new HttpClientBuilder(); + } + + public static class HttpClientBuilder { + private ClientConnectionManager clientConnectionManager; + private HttpParams httpParams; + private URI uri; + private Integer port; + private Credentials credentials; + private boolean laxRedirect; + private Boolean https; + private SchemeSocketFactory socketFactory; + private ConnectionReuseStrategy reuseStrategy; + private boolean trustAll; + private boolean trustSelfSigned; + + public HttpClientBuilder clientConnectionManager(ClientConnectionManager val) { + this.clientConnectionManager = checkNotNull(val, "clientConnectionManager"); + return this; + } + public HttpClientBuilder httpParams(HttpParams val) { + checkState(httpParams == null, "Must not call httpParams multiple times, or after other methods like connectionTimeout"); + this.httpParams = checkNotNull(val, "httpParams"); + return this; + } + public HttpClientBuilder connectionTimeout(Duration val) { + if (httpParams == null) httpParams = new BasicHttpParams(); + long millis = checkNotNull(val, "connectionTimeout").toMilliseconds(); + if (millis > Integer.MAX_VALUE) throw new IllegalStateException("HttpClient only accepts upto max-int millis for connectionTimeout, but given "+val); + HttpConnectionParams.setConnectionTimeout(httpParams, (int) millis); + return this; + } + public HttpClientBuilder socketTimeout(Duration val) { + if (httpParams == null) httpParams = new BasicHttpParams(); + long millis = checkNotNull(val, "socketTimeout").toMilliseconds(); + if (millis > Integer.MAX_VALUE) throw new IllegalStateException("HttpClient only accepts upto max-int millis for socketTimeout, but given "+val); + HttpConnectionParams.setSoTimeout(httpParams, (int) millis); + return this; + } + public HttpClientBuilder reuseStrategy(ConnectionReuseStrategy val) { + this.reuseStrategy = checkNotNull(val, "reuseStrategy"); + return this; + } + public HttpClientBuilder uri(String val) { + return uri(URI.create(checkNotNull(val, "uri"))); + } + public HttpClientBuilder uri(URI val) { + this.uri = checkNotNull(val, "uri"); + if (https == null) https = ("https".equalsIgnoreCase(uri.getScheme())); + return this; + } + public HttpClientBuilder port(int val) { + this.port = val; + return this; + } + public HttpClientBuilder credentials(Credentials val) { + this.credentials = checkNotNull(val, "credentials"); + return this; + } + public void credential(Optional<Credentials> val) { + if (val.isPresent()) credentials = val.get(); + } + /** similar to curl --post301 -L` */ + public HttpClientBuilder laxRedirect(boolean val) { + this.laxRedirect = val; + return this; + } + public HttpClientBuilder https(boolean val) { + this.https = val; + return this; + } + public HttpClientBuilder socketFactory(SchemeSocketFactory val) { + this.socketFactory = checkNotNull(val, "socketFactory"); + return this; + } + public HttpClientBuilder trustAll() { + this.trustAll = true; + return this; + } + public HttpClientBuilder trustSelfSigned() { + this.trustSelfSigned = true; + return this; + } + public HttpClient build() { + final DefaultHttpClient httpClient = new DefaultHttpClient(clientConnectionManager); + httpClient.setParams(httpParams); + + // support redirects for POST (similar to `curl --post301 -L`) + // http://stackoverflow.com/questions/3658721/httpclient-4-error-302-how-to-redirect + if (laxRedirect) { + httpClient.setRedirectStrategy(new LaxRedirectStrategy()); + } + if (reuseStrategy != null) { + httpClient.setReuseStrategy(reuseStrategy); + } + if (https == Boolean.TRUE || (uri!=null && uri.toString().startsWith("https:"))) { + try { + if (port == null) { + port = (uri != null && uri.getPort() >= 0) ? uri.getPort() : 443; + } + if (socketFactory == null) { + if (trustAll) { + TrustStrategy trustStrategy = new TrustAllStrategy(); + X509HostnameVerifier hostnameVerifier = SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER; + socketFactory = new SSLSocketFactory(trustStrategy, hostnameVerifier); + } else if (trustSelfSigned) { + TrustStrategy trustStrategy = new TrustSelfSignedStrategy(); + X509HostnameVerifier hostnameVerifier = SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER; + socketFactory = new SSLSocketFactory(trustStrategy, hostnameVerifier); + } else { + // Using default https scheme: based on default java truststore, which is pretty strict! + } + } + if (socketFactory != null) { + Scheme sch = new Scheme("https", port, socketFactory); + httpClient.getConnectionManager().getSchemeRegistry().register(sch); + } + } catch (Exception e) { + LOG.warn("Error setting trust for uri {}", uri); + throw Exceptions.propagate(e); + } + } + + // Set credentials + if (uri != null && credentials != null) { + String hostname = uri.getHost(); + int port = uri.getPort(); + httpClient.getCredentialsProvider().setCredentials(new AuthScope(hostname, port), credentials); + } + if (uri==null && credentials!=null) { + LOG.warn("credentials have no effect in builder unless URI for host is specified"); + } + + return httpClient; + } + } + + protected static abstract class HttpRequestBuilder<B extends HttpRequestBuilder<B, R>, R extends HttpRequest> { + protected R req; + + protected HttpRequestBuilder(R req) { + this.req = req; + } + @SuppressWarnings("unchecked") + protected B self() { + return (B) this; + } + public B headers(Map<String,String> headers) { + if (headers!=null) { + for (Map.Entry<String,String> entry : headers.entrySet()) { + req.addHeader(entry.getKey(), entry.getValue()); + } + } + return self(); + } + public B headers(Multimap<String,String> headers) { + if (headers!=null) { + for (Map.Entry<String,String> entry : headers.entries()) { + req.addHeader(entry.getKey(), entry.getValue()); + } + } + return self(); + } + public R build() { + return req; + } + } + + protected static abstract class HttpEntityEnclosingRequestBaseBuilder<B extends HttpEntityEnclosingRequestBaseBuilder<B,R>, R extends HttpEntityEnclosingRequestBase> extends HttpRequestBuilder<B, R> { + protected HttpEntityEnclosingRequestBaseBuilder(R req) { + super(req); + } + public B body(byte[] body) { + if (body != null) { + HttpEntity httpEntity = new ByteArrayEntity(body); + req.setEntity(httpEntity); + } + return self(); + } + } + + public static class HttpGetBuilder extends HttpRequestBuilder<HttpGetBuilder, HttpGet> { + public HttpGetBuilder(URI uri) { + super(new HttpGet(uri)); + } + } + + public static class HttpHeadBuilder extends HttpRequestBuilder<HttpHeadBuilder, HttpHead> { + public HttpHeadBuilder(URI uri) { + super(new HttpHead(uri)); + } + } + + public static class HttpDeleteBuilder extends HttpRequestBuilder<HttpDeleteBuilder, HttpDelete> { + public HttpDeleteBuilder(URI uri) { + super(new HttpDelete(uri)); + } + } + + public static class HttpPostBuilder extends HttpEntityEnclosingRequestBaseBuilder<HttpPostBuilder, HttpPost> { + HttpPostBuilder(URI uri) { + super(new HttpPost(uri)); + } + } + + public static class HttpFormPostBuilder extends HttpRequestBuilder<HttpFormPostBuilder, HttpPost> { + HttpFormPostBuilder(URI uri) { + super(new HttpPost(uri)); + } + + public HttpFormPostBuilder params(Map<String, String> params) { + if (params != null) { + Collection<NameValuePair> httpParams = new ArrayList<NameValuePair>(params.size()); + for (Entry<String, String> param : params.entrySet()) { + httpParams.add(new BasicNameValuePair(param.getKey(), param.getValue())); + } + req.setEntity(new UrlEncodedFormEntity(httpParams)); + } + return self(); + } + } + + public static class HttpPutBuilder extends HttpEntityEnclosingRequestBaseBuilder<HttpPutBuilder, HttpPut> { + public HttpPutBuilder(URI uri) { + super(new HttpPut(uri)); + } + } + + public static HttpToolResponse httpGet(HttpClient httpClient, URI uri, Map<String,String> headers) { + HttpGet req = new HttpGetBuilder(uri).headers(headers).build(); + return execAndConsume(httpClient, req); + } + + public static HttpToolResponse httpPost(HttpClient httpClient, URI uri, Map<String,String> headers, byte[] body) { + HttpPost req = new HttpPostBuilder(uri).headers(headers).body(body).build(); + return execAndConsume(httpClient, req); + } + + public static HttpToolResponse httpPut(HttpClient httpClient, URI uri, Map<String, String> headers, byte[] body) { + HttpPut req = new HttpPutBuilder(uri).headers(headers).body(body).build(); + return execAndConsume(httpClient, req); + } + + public static HttpToolResponse httpPost(HttpClient httpClient, URI uri, Map<String,String> headers, Map<String, String> params) { + HttpPost req = new HttpFormPostBuilder(uri).headers(headers).params(params).build(); + return execAndConsume(httpClient, req); + } + + public static HttpToolResponse httpDelete(HttpClient httpClient, URI uri, Map<String,String> headers) { + HttpDelete req = new HttpDeleteBuilder(uri).headers(headers).build(); + return execAndConsume(httpClient, req); + } + + public static HttpToolResponse httpHead(HttpClient httpClient, URI uri, Map<String,String> headers) { + HttpHead req = new HttpHeadBuilder(uri).headers(headers).build(); + return execAndConsume(httpClient, req); + } + + public static HttpToolResponse execAndConsume(HttpClient httpClient, HttpUriRequest req) { + long startTime = System.currentTimeMillis(); + try { + HttpResponse httpResponse = httpClient.execute(req); + + try { + return new HttpToolResponse(httpResponse, startTime); + } finally { + EntityUtils.consume(httpResponse.getEntity()); + } + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + + public static boolean isStatusCodeHealthy(int code) { return (code>=200 && code<=299); } + + public static String toBasicAuthorizationValue(UsernamePasswordCredentials credentials) { + return "Basic "+Base64.encodeBase64String( (credentials.getUserName()+":"+credentials.getPassword()).getBytes() ); + } + + public static String encodeUrlParams(Map<?,?> data) { + if (data==null) return ""; + Iterable<String> args = Iterables.transform(data.entrySet(), + new Function<Map.Entry<?,?>,String>() { + @Override public String apply(Map.Entry<?,?> entry) { + Object k = entry.getKey(); + Object v = entry.getValue(); + return URLParamEncoder.encode(Strings.toString(k)) + (v != null ? "=" + URLParamEncoder.encode(Strings.toString(v)) : ""); + } + }); + return Joiner.on("&").join(args); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/http/HttpToolResponse.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/http/HttpToolResponse.java b/core/src/main/java/org/apache/brooklyn/core/util/http/HttpToolResponse.java new file mode 100644 index 0000000..97e7793 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/http/HttpToolResponse.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.http; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.event.feed.http.HttpPollValue; +import brooklyn.util.guava.Maybe; +import brooklyn.util.stream.Streams; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Objects; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.common.io.ByteStreams; + +public class HttpToolResponse implements HttpPollValue { + + private static final Logger log = LoggerFactory.getLogger(HttpToolResponse.class); + + private final Object mutex = new Object(); + private final HttpResponse response; + private final long startTime; + private final long durationMillisOfFirstResponse; + private final long durationMillisOfFullContent; + private int responseCode; + private String reasonPhrase; + private Map<String,List<String>> headerLists; + private byte[] content; + + + public HttpToolResponse(HttpResponse response, long startTime) { + this.response = response; + this.startTime = startTime; + + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + HttpEntity entity = response.getEntity(); + if (entity != null) { + entity.getContentLength(); + durationMillisOfFirstResponse = Duration.sinceUtc(startTime).toMilliseconds(); + + ByteStreams.copy(entity.getContent(), out); + content = out.toByteArray(); + + entity.getContentLength(); + } else { + durationMillisOfFirstResponse = Duration.sinceUtc(startTime).toMilliseconds(); + content = new byte[0]; + } + durationMillisOfFullContent = Duration.sinceUtc(startTime).toMilliseconds(); + if (log.isTraceEnabled()) + log.trace("HttpPollValue latency "+Time.makeTimeStringRounded(durationMillisOfFirstResponse)+" / "+Time.makeTimeStringRounded(durationMillisOfFullContent)+", content size "+content.length); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + + public HttpToolResponse(int responseCode, Map<String,? extends List<String>> headers, byte[] content, + long startTime, long durationMillisOfFirstResponse, long durationMillisOfFullContent) { + this.response = null; + this.responseCode = responseCode; + this.headerLists = ImmutableMap.copyOf(headers); + this.content = content; + this.startTime = startTime; + this.durationMillisOfFirstResponse = durationMillisOfFirstResponse; + this.durationMillisOfFullContent = durationMillisOfFullContent; + } + + public int getResponseCode() { + synchronized (mutex) { + if (responseCode == 0) { + responseCode = response.getStatusLine().getStatusCode(); + } + } + return responseCode; + } + + public String getReasonPhrase() { + synchronized (mutex) { + if (reasonPhrase == null) { + reasonPhrase = response.getStatusLine().getReasonPhrase(); + } + } + return reasonPhrase; + } + + /** returns the timestamp (millis since 1970) when this request was started */ + public long getStartTime() { + return startTime; + } + + /** returns latency, in milliseconds, if value was initialized with a start time */ + public long getLatencyFullContent() { + return durationMillisOfFullContent; + } + + /** returns latency, in milliseconds, before response started coming in */ + public long getLatencyFirstResponse() { + return durationMillisOfFirstResponse; + } + + public Map<String, List<String>> getHeaderLists() { + synchronized (mutex) { + if (headerLists == null) { + Map<String, List<String>> headerListsMutable = Maps.newLinkedHashMap(); + for (Header header : response.getAllHeaders()) { + List<String> vals = headerListsMutable.get(header.getName()); + if (vals == null) { + vals = new ArrayList<String>(); + headerListsMutable.put(header.getName(), vals); + } + vals.add(header.getValue()); + } + headerLists = Collections.unmodifiableMap(headerListsMutable); + } + } + return headerLists; + } + + public byte[] getContent() { + synchronized (mutex) { + if (content == null) { + InputStream in = null; + try { + in = response.getEntity().getContent(); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteStreams.copy(in, out); + content = out.toByteArray(); + } catch (IOException e) { + throw Throwables.propagate(e); + } finally { + Streams.closeQuietly(in); + } + } + } + return content; + } + + public String getContentAsString() { + return new String(getContent()); + } + + public Maybe<HttpResponse> getResponse() { + return Maybe.fromNullable(response); + } + + @Override + public String toString() { + return Objects.toStringHelper(getClass()) + .add("responseCode", responseCode) + .toString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/internal/ConfigKeySelfExtracting.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/internal/ConfigKeySelfExtracting.java b/core/src/main/java/org/apache/brooklyn/core/util/internal/ConfigKeySelfExtracting.java new file mode 100644 index 0000000..a1d85ca --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/internal/ConfigKeySelfExtracting.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.internal; + +import java.util.Map; + +import org.apache.brooklyn.api.management.ExecutionContext; + +import brooklyn.config.ConfigKey; + +/** Interface for resolving key values; typically implemented by the config key, + * but discouraged for external usage. + */ +public interface ConfigKeySelfExtracting<T> extends ConfigKey<T> { + + /** + * Extracts the value for this config key from the given map. + */ + T extractValue(Map<?,?> configMap, ExecutionContext exec); + + /** + * @return True if there is an entry in the configMap that could be extracted + */ + boolean isSet(Map<?,?> configMap); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/internal/Repeater.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/internal/Repeater.java b/core/src/main/java/org/apache/brooklyn/core/util/internal/Repeater.java new file mode 100644 index 0000000..39e79da --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/internal/Repeater.java @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.internal; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.core.util.flags.FlagUtils; +import org.apache.brooklyn.core.util.flags.SetFromFlag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.util.JavaGroovyEquivalents; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.internal.TimeExtras; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Callables; + +/** + * Simple DSL to repeat a fragment of code periodically until a condition is satisfied. + * + * In its simplest case, it is passed two {@link groovy.lang.Closure}s / {@link Callable} - + * the first is executed, then the second. If the second closure returns false, the loop + * is repeated; if true, it finishes. Further customization can be applied to set the period + * between loops and place a maximum limit on how long the loop should run for. + * <p> + * It is configured in a <em>fluent</em> manner. For example, in Groovy: + * <pre> + * {@code + * Repeater.create("Wait until the Frobnitzer is ready") + * .repeat { + * status = frobnitzer.getStatus() + * } + * .until { + * status == "Ready" || status == "Failed" + * } + * .limitIterationsTo(30) + * .run() + * } + * </pre> + * + * Or in Java: + * <pre> + * {@code + * Repeater.create("Wait until the Frobnitzer is ready") + * .until(new Callable<Boolean>() { + * public Boolean call() { + * String status = frobnitzer.getStatus() + * return "Ready".equals(status) || "Failed".equals(status); + * }}) + * .limitIterationsTo(30) + * .run() + * } + * </pre> + * + * @deprecated since 0.7.0, use {@link brooklyn.util.repeat.Repeater} instead + */ +@Deprecated +public class Repeater { + + // TODO Was converted to Java, from groovy. Needs thorough review and improvements + // to use idiomatic java + + private static final Logger log = LoggerFactory.getLogger(Repeater.class); + + static { TimeExtras.init(); } + + @SetFromFlag + private String description; + private Callable<?> body = Callables.returning(null); + private Callable<Boolean> exitCondition; + @SetFromFlag + private Long period = null; + @SetFromFlag("timeout") + private Long durationLimit = null; + private int iterationLimit = 0; + private boolean rethrowException = false; + private boolean rethrowExceptionImmediately = false; + private boolean warnOnUnRethrownException = true; + + public Repeater() { + this(MutableMap.of(), null); + } + + public Repeater(Map<?,?> flags) { + this(flags, null); + } + + public Repeater(String description) { + this(MutableMap.of(), description); + } + + /** + * Construct a new instance of Repeater. + * + * @param flags can include period, timeout, description + * @param description a description of the operation that will appear in debug logs. + */ + public Repeater(Map<?,?> flags, String description) { + setFromFlags(flags); + this.description = JavaGroovyEquivalents.elvis(description, this.description, "Repeater"); + } + + public void setFromFlags(Map<?,?> flags) { + FlagUtils.setFieldsFromFlags(flags, this); + } + + public static Repeater create() { + return create(MutableMap.of()); + } + public static Repeater create(Map<?,?> flags) { + return create(flags, null); + } + public static Repeater create(String description) { + return create(MutableMap.of(), description); + } + public static Repeater create(Map<?,?> flags, String description) { + return new Repeater(flags, description); + } + + /** + * Sets the main body of the loop to be a no-op. + * + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater repeat() { + return repeat(Callables.returning(null)); + } + + /** + * Sets the main body of the loop. + * + * @param body a closure or other Runnable that is executed in the main body of the loop. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater repeat(Runnable body) { + checkNotNull(body, "body must not be null"); + this.body = (body instanceof Callable) ? (Callable<?>)body : Executors.callable(body); + return this; + } + + /** + * Sets the main body of the loop. + * + * @param body a closure or other Callable that is executed in the main body of the loop. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater repeat(Callable<?> body) { + checkNotNull(body, "body must not be null"); + this.body = body; + return this; + } + + /** + * Set how long to wait between loop iterations. + * + * @param period how long to wait between loop iterations. + * @param unit the unit of measurement of the period. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater every(long period, TimeUnit unit) { + Preconditions.checkArgument(period > 0, "period must be positive: %s", period); + checkNotNull(unit, "unit must not be null"); + this.period = unit.toMillis(period); + return this; + } + + /** + * @see #every(long, TimeUnit) + */ + public Repeater every(Duration duration) { + Preconditions.checkNotNull(duration, "duration must not be null"); + Preconditions.checkArgument(duration.toMilliseconds()>0, "period must be positive: %s", duration); + this.period = duration.toMilliseconds(); + return this; + } + + public Repeater every(groovy.time.Duration duration) { + return every(Duration.of(duration)); + } + + /** + * @see #every(long, TimeUnit) + * @deprecated specify unit + */ + public Repeater every(long duration) { + return every(duration, TimeUnit.MILLISECONDS); + } + + /** + * Set code fragment that tests if the loop has completed. + * + * @param exitCondition a closure or other Callable that returns a boolean. If this code returns {@literal true} then the + * loop will stop executing. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater until(Callable<Boolean> exitCondition) { + Preconditions.checkNotNull(exitCondition, "exitCondition must not be null"); + this.exitCondition = exitCondition; + return this; + } + + /** + * If the exit condition check throws an exception, it will be recorded and the last exception will be thrown on failure. + * + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater rethrowException() { + this.rethrowException = true; + return this; + } + + /** + * If the repeated body or the exit condition check throws an exception, then propagate that exception immediately. + * + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater rethrowExceptionImmediately() { + this.rethrowExceptionImmediately = true; + return this; + } + + public Repeater suppressWarnings() { + this.warnOnUnRethrownException = false; + return this; + } + + /** + * Set the maximum number of iterations. + * + * The loop will exit if the condition has not been satisfied after this number of iterations. + * + * @param iterationLimit the maximum number of iterations. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater limitIterationsTo(int iterationLimit) { + Preconditions.checkArgument(iterationLimit > 0, "iterationLimit must be positive: %s", iterationLimit); + this.iterationLimit = iterationLimit; + return this; + } + + /** + * Set the amount of time to wait for the condition. + * The repeater will wait at least this long for the condition to be true, + * and will exit soon after even if the condition is false. + * + * @param deadline the time that the loop should wait. + * @param unit the unit of measurement of the period. + * @return {@literal this} to aid coding in a fluent style. + */ + public Repeater limitTimeTo(long deadline, TimeUnit unit) { + Preconditions.checkArgument(deadline > 0, "deadline must be positive: %s", deadline); + Preconditions.checkNotNull(unit, "unit must not be null"); + this.durationLimit = unit.toMillis(deadline); + return this; + } + + /** + * @see #limitTimeTo(long, TimeUnit) + */ + public Repeater limitTimeTo(Duration duration) { + Preconditions.checkNotNull(duration, "duration must not be null"); + Preconditions.checkArgument(duration.toMilliseconds() > 0, "deadline must be positive: %s", duration); + this.durationLimit = duration.toMilliseconds(); + return this; + } + + /** + * Run the loop. + * + * @return true if the exit condition was satisfied; false if the loop terminated for any other reason. + */ + public boolean run() { + Preconditions.checkState(body != null, "repeat() method has not been called to set the body"); + Preconditions.checkState(exitCondition != null, "until() method has not been called to set the exit condition"); + Preconditions.checkState(period != null, "every() method has not been called to set the loop period time units"); + + Throwable lastError = null; + int iterations = 0; + long endTime = -1; + if (durationLimit != null) { + endTime = System.currentTimeMillis() + durationLimit; + } + + while (true) { + iterations++; + + try { + body.call(); + } catch (Exception e) { + log.warn(description, e); + if (rethrowExceptionImmediately) throw Exceptions.propagate(e); + } + + boolean done = false; + try { + lastError = null; + done = exitCondition.call(); + } catch (Exception e) { + if (log.isDebugEnabled()) log.debug(description, e); + lastError = e; + if (rethrowExceptionImmediately) throw Exceptions.propagate(e); + } + if (done) { + if (log.isDebugEnabled()) log.debug("{}: condition satisfied", description); + return true; + } else { + if (log.isDebugEnabled()) { + String msg = String.format("%s: unsatisfied during iteration %s %s", description, iterations, + (iterationLimit > 0 ? "(max "+iterationLimit+" attempts)" : "") + + (endTime > 0 ? "("+Time.makeTimeStringRounded(endTime - System.currentTimeMillis())+" remaining)" : "")); + if (iterations == 1) { + log.debug(msg); + } else { + log.trace(msg); + } + } + } + + if (iterationLimit > 0 && iterations == iterationLimit) { + if (log.isDebugEnabled()) log.debug("{}: condition not satisfied and exceeded iteration limit", description); + if (rethrowException && lastError != null) { + log.warn("{}: error caught checking condition (rethrowing): {}", description, lastError.getMessage()); + throw Exceptions.propagate(lastError); + } + if (warnOnUnRethrownException && lastError != null) + log.warn("{}: error caught checking condition: {}", description, lastError.getMessage()); + return false; + } + + if (endTime > 0) { + if (System.currentTimeMillis() > endTime) { + if (log.isDebugEnabled()) log.debug("{}: condition not satisfied and deadline {} passed", + description, Time.makeTimeStringRounded(endTime - System.currentTimeMillis())); + if (rethrowException && lastError != null) { + log.error("{}: error caught checking condition: {}", description, lastError.getMessage()); + throw Exceptions.propagate(lastError); + } + return false; + } + } + + Time.sleep(period); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/BackoffLimitedRetryHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/BackoffLimitedRetryHandler.java b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/BackoffLimitedRetryHandler.java new file mode 100644 index 0000000..b8d6eac --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/BackoffLimitedRetryHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.internal.ssh; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.util.exceptions.Exceptions; + +/** + * Allow replayable request to be retried a limited number of times, and impose an exponential back-off + * delay before returning. + * <p> + * Copied and modified from jclouds; original author was James Murty + */ +public class BackoffLimitedRetryHandler { + + private static final Logger LOG = LoggerFactory.getLogger(BackoffLimitedRetryHandler.class); + + private final int retryCountLimit; + + private final long delayStart; + + public BackoffLimitedRetryHandler() { + this(5, 50L); + } + + public BackoffLimitedRetryHandler(int retryCountLimit, long delayStart) { + this.retryCountLimit = retryCountLimit; + this.delayStart = delayStart; + } + + public void imposeBackoffExponentialDelay(int failureCount, String commandDescription) { + imposeBackoffExponentialDelay(delayStart, 2, failureCount, retryCountLimit, commandDescription); + } + + public void imposeBackoffExponentialDelay(long period, int pow, int failureCount, int max, String commandDescription) { + imposeBackoffExponentialDelay(period, period * 10l, pow, failureCount, max, commandDescription); + } + + public void imposeBackoffExponentialDelay(long period, + long maxPeriod, + int pow, + int failureCount, + int max, + String commandDescription) { + long delayMs = (long) (period * Math.pow(failureCount, pow)); + delayMs = (delayMs > maxPeriod) ? maxPeriod : delayMs; + if (LOG.isDebugEnabled()) LOG.debug("Retry {}/{}: delaying for {} ms: {}", + new Object[] {failureCount, max, delayMs, commandDescription}); + try { + Thread.sleep(delayMs); + } catch (InterruptedException e) { + Exceptions.propagate(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/ShellAbstractTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/ShellAbstractTool.java b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/ShellAbstractTool.java new file mode 100644 index 0000000..90dcffa --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/ShellAbstractTool.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.internal.ssh; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.brooklyn.core.util.flags.TypeCoercions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.util.collections.MutableList; +import brooklyn.util.os.Os; +import brooklyn.util.ssh.BashCommands; +import brooklyn.util.text.Identifiers; +import brooklyn.util.text.StringEscapes.BashStringEscapes; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Joiner; +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; + +public abstract class ShellAbstractTool implements ShellTool { + + private static final Logger LOG = LoggerFactory.getLogger(ShellAbstractTool.class); + + protected final File localTempDir; + + public ShellAbstractTool(String localTempDir) { + this(localTempDir == null ? null : new File(Os.tidyPath(localTempDir))); + } + + public ShellAbstractTool(File localTempDir) { + if (localTempDir == null) { + localTempDir = new File(Os.tmp(), "tmpssh-"+Os.user()); + if (!localTempDir.exists()) localTempDir.mkdir(); + Os.deleteOnExitEmptyParentsUpTo(localTempDir, new File(Os.tmp())); + } + this.localTempDir = localTempDir; + } + + public ShellAbstractTool() { + this((File)null); + } + + protected static void warnOnDeprecated(Map<String, ?> props, String deprecatedKey, String correctKey) { + if (props.containsKey(deprecatedKey)) { + if (correctKey != null && props.containsKey(correctKey)) { + Object dv = props.get(deprecatedKey); + Object cv = props.get(correctKey); + if (!Objects.equal(cv, dv)) { + LOG.warn("SshTool detected deprecated key '"+deprecatedKey+"' with different value ("+dv+") "+ + "than new key '"+correctKey+"' ("+cv+"); ambiguous which will be used"); + } else { + // ignore, the deprecated key populated for legacy reasons + } + } else { + Object dv = props.get(deprecatedKey); + LOG.warn("SshTool detected deprecated key '"+deprecatedKey+"' used, with value ("+dv+")"); + } + } + } + + protected static Boolean hasVal(Map<String,?> map, ConfigKey<?> keyC) { + String key = keyC.getName(); + return map.containsKey(key); + } + + protected static <T> T getMandatoryVal(Map<String,?> map, ConfigKey<T> keyC) { + String key = keyC.getName(); + checkArgument(map.containsKey(key), "must contain key '"+keyC+"'"); + return TypeCoercions.coerce(map.get(key), keyC.getTypeToken()); + } + + public static <T> T getOptionalVal(Map<String,?> map, ConfigKey<T> keyC) { + if (keyC==null) return null; + String key = keyC.getName(); + if (map!=null && map.containsKey(key) && map.get(key) != null) { + return TypeCoercions.coerce(map.get(key), keyC.getTypeToken()); + } else { + return keyC.getDefaultValue(); + } + } + + /** returns the value of the key if specified, otherwise defaultValue */ + protected static <T> T getOptionalVal(Map<String,?> map, ConfigKey<T> keyC, T defaultValue) { + String key = keyC.getName(); + if (map!=null && map.containsKey(key) && map.get(key) != null) { + return TypeCoercions.coerce(map.get(key), keyC.getTypeToken()); + } else { + return defaultValue; + } + } + + protected void closeWhispering(Closeable closeable, Object context) { + closeWhispering(closeable, this, context); + } + + /** + * Similar to Guava's Closeables.closeQuitely, except logs exception at debug with context in message. + */ + protected static void closeWhispering(Closeable closeable, Object context1, Object context2) { + if (closeable != null) { + try { + closeable.close(); + } catch (IOException e) { + if (LOG.isDebugEnabled()) { + String msg = String.format("<< exception during close, for %s -> %s (%s); continuing.", + context1, context2, closeable); + if (LOG.isTraceEnabled()) + LOG.debug(msg + ": " + e); + else + LOG.trace(msg, e); + } + } + } + } + + protected File writeTempFile(InputStream contents) { + File tempFile = Os.writeToTempFile(contents, localTempDir, "sshcopy", "data"); + tempFile.setReadable(false, false); + tempFile.setReadable(true, true); + tempFile.setWritable(false); + tempFile.setExecutable(false); + return tempFile; + } + + protected File writeTempFile(String contents) { + return writeTempFile(contents.getBytes()); + } + + protected File writeTempFile(byte[] contents) { + return writeTempFile(new ByteArrayInputStream(contents)); + } + + protected String toScript(Map<String,?> props, List<String> commands, Map<String,?> env) { + List<String> allcmds = toCommandSequence(commands, env); + StringBuilder result = new StringBuilder(); + result.append(getOptionalVal(props, PROP_SCRIPT_HEADER)).append('\n'); + + for (String cmd : allcmds) { + result.append(cmd).append('\n'); + } + + return result.toString(); + } + + /** + * Merges the commands and env, into a single set of commands. Also escapes the commands as required. + * + * Not all ssh servers handle "env", so instead convert env into exported variables + */ + protected List<String> toCommandSequence(List<String> commands, Map<String,?> env) { + List<String> result = new ArrayList<String>((env!=null ? env.size() : 0) + commands.size()); + + if (env!=null) { + for (Entry<String,?> entry : env.entrySet()) { + if (entry.getKey() == null || entry.getValue() == null) { + LOG.warn("env key-values must not be null; ignoring: key="+entry.getKey()+"; value="+entry.getValue()); + continue; + } + String escapedVal = BashStringEscapes.escapeLiteralForDoubleQuotedBash(entry.getValue().toString()); + result.add("export "+entry.getKey()+"=\""+escapedVal+"\""); + } + } + for (CharSequence cmd : commands) { // objects in commands can be groovy GString so can't treat as String here + result.add(cmd.toString()); + } + + return result; + } + + @Override + public int execScript(Map<String,?> props, List<String> commands) { + return execScript(props, commands, Collections.<String,Object>emptyMap()); + } + + @Override + public int execCommands(Map<String,?> props, List<String> commands) { + return execCommands(props, commands, Collections.<String,Object>emptyMap()); + } + + protected static int asInt(Integer input, int valueIfInputNull) { + return input != null ? input : valueIfInputNull; + } + + protected abstract class ToolAbstractExecScript { + protected final Map<String, ?> props; + protected final String separator; + protected final OutputStream out; + protected final OutputStream err; + protected final String scriptDir; + protected final Boolean runAsRoot; + protected final Boolean noExtraOutput; + protected final Boolean noDeleteAfterExec; + protected final String scriptNameWithoutExtension; + protected final String scriptPath; + protected final Duration execTimeout; + + public ToolAbstractExecScript(Map<String,?> props) { + this.props = props; + this.separator = getOptionalVal(props, PROP_SEPARATOR); + this.out = getOptionalVal(props, PROP_OUT_STREAM); + this.err = getOptionalVal(props, PROP_ERR_STREAM); + + this.scriptDir = getOptionalVal(props, PROP_SCRIPT_DIR); + this.runAsRoot = getOptionalVal(props, PROP_RUN_AS_ROOT); + this.noExtraOutput = getOptionalVal(props, PROP_NO_EXTRA_OUTPUT); + this.noDeleteAfterExec = getOptionalVal(props, PROP_NO_DELETE_SCRIPT); + this.execTimeout = getOptionalVal(props, PROP_EXEC_TIMEOUT); + + String summary = getOptionalVal(props, PROP_SUMMARY); + if (summary!=null) { + summary = Strings.makeValidFilename(summary); + if (summary.length()>30) + summary = summary.substring(0,30); + } + this.scriptNameWithoutExtension = "brooklyn-"+ + Time.makeDateStampString()+"-"+Identifiers.makeRandomId(4)+ + (Strings.isBlank(summary) ? "" : "-"+summary); + this.scriptPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension+".sh"); + } + + /** builds the command to run the given script; + * note that some modes require \$RESULT passed in order to access a variable, whereas most just need $ */ + protected List<String> buildRunScriptCommand() { + MutableList.Builder<String> cmds = MutableList.<String>builder() + .add((runAsRoot ? BashCommands.sudo(scriptPath) : scriptPath) + " < /dev/null") + .add("RESULT=$?"); + if (noExtraOutput==null || !noExtraOutput) + cmds.add("echo Executed "+scriptPath+", result $RESULT"); + if (noDeleteAfterExec!=Boolean.TRUE) { + // use "-f" because some systems have "rm" aliased to "rm -i" + // use "< /dev/null" to guarantee doesn't hang + cmds.add("rm -f "+scriptPath+" < /dev/null"); + } + cmds.add("exit $RESULT"); + return cmds.build(); + } + + protected String getSummary() { + String summary = getOptionalVal(props, PROP_SUMMARY); + return (summary != null) ? summary : scriptPath; + } + + public abstract int run(); + } + + protected abstract class ToolAbstractAsyncExecScript extends ToolAbstractExecScript { + protected final String stdoutPath; + protected final String stderrPath; + protected final String exitStatusPath; + protected final String pidPath; + + public ToolAbstractAsyncExecScript(Map<String,?> props) { + super(props); + + stdoutPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".stdout"); + stderrPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".stderr"); + exitStatusPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".exitstatus"); + pidPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension + ".pid"); + } + + /** + * Builds the command to run the given script, asynchronously. + * The executed command will return immediately, but the output from the script + * will continue to be written + * note that some modes require \$RESULT passed in order to access a variable, whereas most just need $ */ + @Override + protected List<String> buildRunScriptCommand() { + String touchCmd = String.format("touch %s %s %s %s", stdoutPath, stderrPath, exitStatusPath, pidPath); + String cmd = String.format("nohup sh -c \"( %s > %s 2> %s < /dev/null ) ; echo \\$? > %s \" > /dev/null 2>&1 < /dev/null &", scriptPath, stdoutPath, stderrPath, exitStatusPath); + MutableList.Builder<String> cmds = MutableList.<String>builder() + .add(runAsRoot ? BashCommands.sudo(touchCmd) : touchCmd) + .add(runAsRoot ? BashCommands.sudo(cmd) : cmd) + .add("echo $! > "+pidPath) + .add("RESULT=$?"); + if (noExtraOutput==null || !noExtraOutput) { + cmds.add("echo Executing async "+scriptPath); + } + cmds.add("exit $RESULT"); + return cmds.build(); + } + + /** + * Builds the command to retrieve the exit status of the command, written to stdout. + */ + protected List<String> buildRetrieveStatusCommand() { + // Retrieve exit status from file (writtent to stdout), if populated; + // if not found and pid still running, then return empty string; else exit code 1. + List<String> cmdParts = ImmutableList.of( + "# Retrieve status", // comment is to aid testing - see SshjToolAsyncStubIntegrationTest + "if test -s "+exitStatusPath+"; then", + " cat "+exitStatusPath, + "elif test -s "+pidPath+"; then", + " pid=`cat "+pidPath+"`", + " if ! ps -p $pid > /dev/null < /dev/null; then", + " # no exit status, and not executing; give a few seconds grace in case just about to write exit status", + " sleep 3", + " if test -s "+exitStatusPath+"; then", + " cat "+exitStatusPath+"", + " else", + " echo \"No exit status in "+exitStatusPath+", and pid in "+pidPath+" ($pid) not executing\"", + " exit 1", + " fi", + " fi", + "else", + " echo \"No exit status in "+exitStatusPath+", and "+pidPath+" is empty\"", + " exit 1", + "fi"+"\n"); + String cmd = Joiner.on("\n").join(cmdParts); + + MutableList.Builder<String> cmds = MutableList.<String>builder() + .add((runAsRoot ? BashCommands.sudo(cmd) : cmd)) + .add("RESULT=$?"); + cmds.add("exit $RESULT"); + return cmds.build(); + } + + /** + * Builds the command to retrieve the stdout and stderr of the async command. + * An offset can be given, to only retrieve data starting at a particular character (indexed from 0). + */ + protected List<String> buildRetrieveStdoutAndStderrCommand(int stdoutPosition, int stderrPosition) { + // Note that `tail -c +1` means start at the *first* character (i.e. start counting from 1, not 0) + String catStdoutCmd = "tail -c +"+(stdoutPosition+1)+" "+stdoutPath+" 2> /dev/null"; + String catStderrCmd = "tail -c +"+(stderrPosition+1)+" "+stderrPath+" 2>&1 > /dev/null"; + MutableList.Builder<String> cmds = MutableList.<String>builder() + .add((runAsRoot ? BashCommands.sudo(catStdoutCmd) : catStdoutCmd)) + .add((runAsRoot ? BashCommands.sudo(catStderrCmd) : catStderrCmd)) + .add("RESULT=$?"); + cmds.add("exit $RESULT"); + return cmds.build(); + } + + /** + * Builds the command to retrieve the stdout and stderr of the async command. + * An offset can be given, to only retrieve data starting at a particular character (indexed from 0). + */ + protected List<String> buildLongPollCommand(int stdoutPosition, int stderrPosition, Duration timeout) { + long maxTime = Math.max(1, timeout.toSeconds()); + + // Note that `tail -c +1` means start at the *first* character (i.e. start counting from 1, not 0) + List<String> waitForExitStatusParts = ImmutableList.of( + //Should be careful here because any output will be part of the stdout/stderr streams + "# Long poll", // comment is to aid testing - see SshjToolAsyncStubIntegrationTest + // disown to avoid Terminated message after killing the process + // redirect error output to avoid "file truncated" messages + "tail -c +"+(stdoutPosition+1)+" -f "+stdoutPath+" 2> /dev/null & export TAIL_STDOUT_PID=$!; disown", + "tail -c +"+(stderrPosition+1)+" -f "+stderrPath+" 1>&2 2> /dev/null & export TAIL_STDERR_PID=$!; disown", + "EXIT_STATUS_PATH="+exitStatusPath, + "PID_PATH="+pidPath, + "MAX_TIME="+maxTime, + "COUNTER=0", + "while [ \"$COUNTER\" -lt $MAX_TIME ]; do", + " if test -s $EXIT_STATUS_PATH; then", + " EXIT_STATUS=`cat $EXIT_STATUS_PATH`", + " kill ${TAIL_STDERR_PID} ${TAIL_STDOUT_PID} 2> /dev/null", + " exit $EXIT_STATUS", + " elif test -s $PID_PATH; then", + " PID=`cat $PID_PATH`", + " if ! ps -p $PID > /dev/null 2>&1 < /dev/null; then", + " # no exit status, and not executing; give a few seconds grace in case just about to write exit status", + " sleep 3", + " if test -s $EXIT_STATUS_PATH; then", + " EXIT_STATUS=`cat $EXIT_STATUS_PATH`", + " kill ${TAIL_STDERR_PID} ${TAIL_STDOUT_PID} 2> /dev/null", + " exit $EXIT_STATUS", + " else", + " echo \"No exit status in $EXIT_STATUS_PATH, and pid in $PID_PATH ($PID) not executing\"", + " kill ${TAIL_STDERR_PID} ${TAIL_STDOUT_PID} 2> /dev/null", + " exit 126", + " fi", + " fi", + " fi", + " # No exit status in $EXIT_STATUS_PATH; keep waiting", + " sleep 1", + " COUNTER+=1", + "done", + "kill ${TAIL_STDERR_PID} ${TAIL_STDOUT_PID} 2> /dev/null", + "exit 125"+"\n"); + String waitForExitStatus = Joiner.on("\n").join(waitForExitStatusParts); + + return ImmutableList.of(runAsRoot ? BashCommands.sudo(waitForExitStatus) : waitForExitStatus); + } + + protected List<String> deleteTemporaryFilesCommand() { + ImmutableList.Builder<String> cmdParts = ImmutableList.builder(); + + if (!Boolean.TRUE.equals(noDeleteAfterExec)) { + // use "-f" because some systems have "rm" aliased to "rm -i" + // use "< /dev/null" to guarantee doesn't hang + cmdParts.add( + "rm -f "+scriptPath+" "+stdoutPath+" "+stderrPath+" "+exitStatusPath+" "+pidPath+" < /dev/null"); + } + + // If the buildLongPollCommand didn't complete properly then it might have left tail command running; + // ensure they are killed. + cmdParts.add( + //ignore error output for the case where there are no running processes and kill is called without arguments + "ps aux | grep \"tail -c\" | grep \""+stdoutPath+"\" | grep -v grep | awk '{ printf $2 }' | xargs kill 2> /dev/null", + "ps aux | grep \"tail -c\" | grep \""+stderrPath+"\" | grep -v grep | awk '{ printf $2 }' | xargs kill 2> /dev/null"); + + String cmd = Joiner.on("\n").join(cmdParts.build()); + + return ImmutableList.of(runAsRoot ? BashCommands.sudo(cmd) : cmd); + } + + @Override + public abstract int run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/ShellTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/ShellTool.java b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/ShellTool.java new file mode 100644 index 0000000..13bfb62 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/ShellTool.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.internal.ssh; + +import static brooklyn.entity.basic.ConfigKeys.newConfigKey; +import static brooklyn.entity.basic.ConfigKeys.newStringConfigKey; + +import java.io.OutputStream; +import java.util.List; +import java.util.Map; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.ConfigKeys; +import brooklyn.util.os.Os; +import brooklyn.util.time.Duration; + +/** Methods for executing things in an environment (localhost process, or ssh) */ +public interface ShellTool { + + // config which applies to sessions + + public static final ConfigKey<String> PROP_LOCAL_TEMP_DIR = newStringConfigKey( + "localTempDir", + "The directory on the local machine (i.e. running brooklyn) for writing temp files", + Os.mergePaths(Os.tmp(), "brooklyn-"+Os.user()+"-ssh-tmp")); + + // config which applies to calls: + + public static final ConfigKey<Boolean> PROP_RUN_AS_ROOT = newConfigKey("runAsRoot", "When running a script, whether to run as root", Boolean.FALSE); + + public static final ConfigKey<OutputStream> PROP_OUT_STREAM = newConfigKey(OutputStream.class, "out", "Stream to which to capture stdout"); + public static final ConfigKey<OutputStream> PROP_ERR_STREAM = newConfigKey(OutputStream.class, "err", "Stream to which to capture stderr"); + + public static final ConfigKey<Boolean> PROP_NO_EXTRA_OUTPUT = newConfigKey("noExtraOutput", "Suppresses any decorative output such as result code which some tool commands insert", false); + + public static final ConfigKey<String> PROP_SEPARATOR = newConfigKey("separator", "string to insert between caller-supplied commands being executed as commands", " ; "); + + public static final ConfigKey<String> PROP_SCRIPT_DIR = newConfigKey("scriptDir", "directory where scripts should be copied", "/tmp"); + public static final ConfigKey<String> PROP_SCRIPT_HEADER = newConfigKey("scriptHeader", "lines to insert at the start of scripts generated for caller-supplied commands for script execution", "#!/bin/bash -e\n"); + public static final ConfigKey<String> PROP_DIRECT_HEADER = newConfigKey("directHeader", "commands to run at the target before any caller-supplied commands for direct execution", "exec bash -e"); + + ConfigKey<Boolean> PROP_NO_DELETE_SCRIPT = newConfigKey("noDeleteAfterExec", "Retains the generated script file after executing the commands instead of deleting it", false); + + ConfigKey<String> PROP_SUMMARY = ConfigKeys.newStringConfigKey("summary", "Provides a human-readable summary, used in file generation etc"); + + ConfigKey<Duration> PROP_EXEC_TIMEOUT = newConfigKey("execTimeout", "Timeout when executing a script", Duration.PRACTICALLY_FOREVER); + + ConfigKey<Boolean> PROP_EXEC_ASYNC = newConfigKey("execAsync", "Executes the script asynchronously, and then polls for the result (and for stdout/stderr)", false); + + ConfigKey<Duration> PROP_EXEC_ASYNC_POLLING_TIMEOUT = newConfigKey("execAsyncPollTimeout", "Timeout per poll when executing a script asynchronously", Duration.ONE_MINUTE); + + /** + * Executes the set of commands in a shell script. Blocks until completion. + * <p> + * + * Optional properties are the same common ones as for {@link #execCommands(Map, List, Map)} with the addition of: + * <ul> + * <li>{@link #PROP_RUN_AS_ROOT} + * <li>{@link #PROP_SCRIPT_DIR} + * </ul> + * + * @return exit status of script + */ + public int execScript(Map<String,?> props, List<String> commands, Map<String,?> env); + + /** + * @see #execScript(Map, List, Map) + */ + public int execScript(Map<String,?> props, List<String> commands); + + /** + * Executes the set of commands using ssh exec. + * + * This is generally more efficient than ssh shell mode (cf {@link #execScript(Map, List, Map)}), + * but is not suitable if you need env values which are only set on a fully-fledged shell, + * or if you want the entire block executed with root permission. + * + * Common optional properties (which also apply to {@link #execScript(Map, List, Map)}) are: + * <ul> + * <li>{@link #PROP_OUT_STREAM} + * <li>{@link #PROP_ERR_STREAM} + * <li>{@link #PROP_SEPARATOR} (for some modes) + * <li>{@link #PROP_NO_EXTRA_OUTPUT} (often there is no extra output here) + * </ul> + * + * Note that {@link #PROP_RUN_AS_ROOT} is <i>not</i> typically supported here. Prefer {@link #execScript(Map, List, Map)}). + * + * @return exit status of commands + */ + public int execCommands(Map<String,?> properties, List<String> commands, Map<String,?> env); + + /** + * @see #execCommands(Map, List, Map) + */ + public int execCommands(Map<String,?> properties, List<String> commands); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/SshAbstractTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/SshAbstractTool.java b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/SshAbstractTool.java new file mode 100644 index 0000000..ea7a71e --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/SshAbstractTool.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.internal.ssh; + +import static brooklyn.util.net.Networking.checkPortValid; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.File; +import java.util.Map; +import java.util.Set; + +import brooklyn.util.os.Os; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + +public abstract class SshAbstractTool extends ShellAbstractTool implements SshTool { + + protected final String toString; + + protected final String host; + protected final String user; + protected final String password; + protected final int port; + protected String privateKeyPassphrase; + protected String privateKeyData; + protected File privateKeyFile; + protected boolean strictHostKeyChecking; + protected boolean allocatePTY; + + public static interface SshAction<T> { + void clear() throws Exception; + T create() throws Exception; + } + + public static abstract class AbstractSshToolBuilder<T extends SshTool, B extends AbstractSshToolBuilder<T,B>> { + protected String host; + protected int port = 22; + protected String user = System.getProperty("user.name"); + protected String password; + protected String privateKeyData; + protected String privateKeyPassphrase; + protected Set<String> privateKeyFiles = Sets.newLinkedHashSet(); + protected boolean strictHostKeyChecking = false; + protected boolean allocatePTY = false; + protected File localTempDir = null; + + @SuppressWarnings("unchecked") + protected B self() { + return (B) this; + } + + public B from(Map<String,?> props) { + host = getMandatoryVal(props, PROP_HOST); + port = getOptionalVal(props, PROP_PORT); + user = getOptionalVal(props, PROP_USER); + + password = getOptionalVal(props, PROP_PASSWORD); + + warnOnDeprecated(props, "privateKey", "privateKeyData"); + privateKeyData = getOptionalVal(props, PROP_PRIVATE_KEY_DATA); + privateKeyPassphrase = getOptionalVal(props, PROP_PRIVATE_KEY_PASSPHRASE); + + // for backwards compatibility accept keyFiles and privateKey + // but sshj accepts only a single privateKeyFile; leave blank to use defaults (i.e. ~/.ssh/id_rsa and id_dsa) + warnOnDeprecated(props, "keyFiles", null); + String privateKeyFile = getOptionalVal(props, PROP_PRIVATE_KEY_FILE); + if (privateKeyFile != null) privateKeyFiles.add(privateKeyFile); + + strictHostKeyChecking = getOptionalVal(props, PROP_STRICT_HOST_KEY_CHECKING); + allocatePTY = getOptionalVal(props, PROP_ALLOCATE_PTY); + + String localTempDirPath = getOptionalVal(props, PROP_LOCAL_TEMP_DIR); + localTempDir = (localTempDirPath == null) ? null : new File(Os.tidyPath(localTempDirPath)); + + return self(); + } + public B host(String val) { + this.host = val; return self(); + } + public B user(String val) { + this.user = val; return self(); + } + public B password(String val) { + this.password = val; return self(); + } + public B port(int val) { + this.port = val; return self(); + } + public B privateKeyPassphrase(String val) { + this.privateKeyPassphrase = val; return self(); + } + /** @deprecated 1.4.0, use privateKeyData */ + public B privateKey(String val) { + this.privateKeyData = val; return self(); + } + public B privateKeyData(String val) { + this.privateKeyData = val; return self(); + } + public B privateKeyFile(String val) { + this.privateKeyFiles.add(val); return self(); + } + public B localTempDir(File val) { + this.localTempDir = val; return self(); + } + public abstract T build(); + } + + protected SshAbstractTool(AbstractSshToolBuilder<?,?> builder) { + super(builder.localTempDir); + + host = checkNotNull(builder.host, "host"); + port = builder.port; + user = builder.user; + password = builder.password; + strictHostKeyChecking = builder.strictHostKeyChecking; + allocatePTY = builder.allocatePTY; + privateKeyPassphrase = builder.privateKeyPassphrase; + privateKeyData = builder.privateKeyData; + + if (builder.privateKeyFiles.size() > 1) { + throw new IllegalArgumentException("sshj supports only a single private key-file; " + + "for defaults of ~/.ssh/id_rsa and ~/.ssh/id_dsa leave blank"); + } else if (builder.privateKeyFiles.size() == 1) { + String privateKeyFileStr = Iterables.get(builder.privateKeyFiles, 0); + String amendedKeyFile = privateKeyFileStr.startsWith("~") ? (System.getProperty("user.home")+privateKeyFileStr.substring(1)) : privateKeyFileStr; + privateKeyFile = new File(amendedKeyFile); + } else { + privateKeyFile = null; + } + + checkArgument(host.length() > 0, "host value must not be an empty string"); + checkPortValid(port, "ssh port"); + + toString = String.format("%s@%s:%d", user, host, port); + } + + @Override + public String toString() { + return toString; + } + + public String getHostAddress() { + return this.host; + } + + public String getUsername() { + return this.user; + } + + protected SshException propagate(Exception e, String message) throws SshException { + throw new SshException("(" + toString() + ") " + message + ": " + e.getMessage(), e); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/SshException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/SshException.java b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/SshException.java new file mode 100644 index 0000000..c13aa42 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/internal/ssh/SshException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.internal.ssh; + +public class SshException extends RuntimeException { + + private static final long serialVersionUID = -5690230838066860965L; + + public SshException(String msg) { + super(msg); + } + + public SshException(String msg, Throwable cause) { + super(msg, cause); + } +}
