http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/RestMetricsSender.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/RestMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/RestMetricsSender.java new file mode 100644 index 0000000..5130ae3 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/RestMetricsSender.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.net; + +import com.google.common.base.Stopwatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.ProtocolException; + +/** + * Implements MetricsSender and provides a way of pushing metrics to application metrics history service using REST + * endpoint. + */ +public class RestMetricsSender implements MetricsSender { + private final static Logger LOG = LoggerFactory.getLogger(RestMetricsSender.class); + + private final static String COLLECTOR_URL = "http://%s:8188/ws/v1/timeline/metrics"; + private final String collectorServiceAddress; + + /** + * Creates unconnected RestMetricsSender with endpoint configured as + * http://${metricsHost}:8188/ws/v1/timeline/metrics, + * where ${metricsHost} is specified by metricHost param. + * + * @param metricsHost the hostname that will be used to access application metrics history service. + */ + public RestMetricsSender(String metricsHost) { + collectorServiceAddress = String.format(COLLECTOR_URL, metricsHost); + } + + /** + * Push metrics to the REST endpoint. Connection is always open and closed on every call. + * + * @param payload the payload with metrics to be sent to metrics service + * @return response message either acknowledgement or error, empty on exception + */ + @Override + public String pushMetrics(String payload) { + String responseString = ""; + UrlService svc = null; + Stopwatch timer = new Stopwatch().start(); + + try { + LOG.info("server: {}", collectorServiceAddress); + + svc = getConnectedUrlService(); + responseString = svc.send(payload); + + timer.stop(); + LOG.info("http response time: " + timer.elapsedMillis() + " ms"); + + if (responseString.length() > 0) { + LOG.debug("POST response from server: " + responseString); + } + } catch (MalformedURLException e) { + LOG.error("", e); + } catch (ProtocolException e) { + LOG.error("", e); + } catch (IOException e) { + LOG.error("", e); + } finally { + if (svc != null) { + svc.disconnect(); + } + } + + return responseString; + } + + /** + * Relaxed to protected for testing. + */ + protected UrlService getConnectedUrlService() throws IOException { + return UrlService.newConnection(collectorServiceAddress); + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/StdOutMetricsSender.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/StdOutMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/StdOutMetricsSender.java new file mode 100644 index 0000000..aeb4ca8 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/StdOutMetricsSender.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.net; + +import java.io.PrintStream; + +/** + * StdOutMetricsSender dumps metrics to defined PrintStream out. It is useful for testing. + */ +public class StdOutMetricsSender implements MetricsSender { + public final PrintStream out; + private String metricsHostName; + + /** + * Creates new StdOutMetricsSender with specified hostname (only used in messages) and sends output to System.out + * + * @param metricsHostName a name used in printed messages + */ + public StdOutMetricsSender(String metricsHostName) { + this(metricsHostName, System.out); + } + + /** + * Creates new StdOutMetricsSender with specified hostname (only used in messages) and PrintStream which is used as + * an output. + * + * @param metricsHostName a name used in printed messages + * @param out PrintStream that the Sender will write to, can be System.out + */ + public StdOutMetricsSender(String metricsHostName, PrintStream out) { + this.metricsHostName = metricsHostName; + this.out = out; + } + + @Override + public String pushMetrics(String payload) { + out.println("Sending to " + metricsHostName + ": " + payload); + + return "OK"; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/UrlService.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/UrlService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/UrlService.java new file mode 100644 index 0000000..7402438 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/UrlService.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.net; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.HttpURLConnection; +import java.net.URL; + +public class UrlService { + + public static final int CONNECT_TIMEOUT = 20000; + public static final int READ_TIMEOUT = 20000; + private final String address; + private HttpURLConnection conn; + + private UrlService(String address) { + this.address = address; + } + + /** + * Returns a new UrlService connected to specified address. + * + * @param address + * @return + * @throws IOException + */ + public static UrlService newConnection(String address) throws IOException { + UrlService svc = new UrlService(address); + svc.connect(); + + return svc; + } + + public HttpURLConnection connect() throws IOException { + URL url = new URL(address); + conn = (HttpURLConnection) url.openConnection(); + + //TODO: make timeouts configurable + conn.setConnectTimeout(CONNECT_TIMEOUT); + conn.setReadTimeout(READ_TIMEOUT); + conn.setDoInput(true); + conn.setDoOutput(true); + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-Type", "application/json"); + conn.setRequestProperty("Accept", "*/*"); + + return conn; + } + + public String send(String payload) throws IOException { + if (conn == null) + throw new IllegalStateException("Cannot use unconnected UrlService"); + write(payload); + + return read(); + } + + private String read() throws IOException { + StringBuilder response = new StringBuilder(); + + BufferedReader br = new BufferedReader(new InputStreamReader( + conn.getInputStream())); + String line = null; + while ((line = br.readLine()) != null) { + response.append(line); + } + br.close(); + + return response.toString(); + } + + private void write(String payload) throws IOException { + OutputStreamWriter writer = new OutputStreamWriter(conn.getOutputStream(), + "UTF-8"); + writer.write(payload); + writer.close(); + } + + public void disconnect() { + conn.disconnect(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/Json.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/Json.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/Json.java new file mode 100644 index 0000000..61a3903 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/Json.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.util; + +import org.codehaus.jackson.annotate.JsonAutoDetect; +import org.codehaus.jackson.annotate.JsonMethod; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; + +import java.io.IOException; + +/** + * Small wrapper that configures the ObjectMapper with some defaults. + */ +public class Json { + private ObjectMapper myObjectMapper; + + /** + * Creates default Json ObjectMapper that maps fields. + */ + public Json() { + this(false); + } + + /** + * Creates a Json ObjectMapper that maps fields and optionally pretty prints the + * serialized objects. + * + * @param pretty a flag - if true the output will be pretty printed. + */ + public Json(boolean pretty) { + myObjectMapper = new ObjectMapper(); + myObjectMapper.setVisibility(JsonMethod.FIELD, JsonAutoDetect.Visibility.ANY); + if (pretty) { + myObjectMapper.configure(SerializationConfig.Feature.INDENT_OUTPUT, true); + } + } + + public String serialize(Object o) throws IOException { + return myObjectMapper.writeValueAsString(o); + } + + public <T> T deserialize(String content, Class<T> paramClass) throws IOException { + return myObjectMapper.readValue(content, paramClass); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/RandomMetricsProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/RandomMetricsProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/RandomMetricsProvider.java new file mode 100644 index 0000000..7910711 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/RandomMetricsProvider.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.util; + +import java.util.Random; + +/** + */ +public class RandomMetricsProvider { + + private double min; + private double max; + private Random rnd; + + public RandomMetricsProvider(double min, double max) { + this.min = min; + this.max = max; + this.rnd = new Random(); + } + + public double next() { + return rnd.nextDouble() * (max - min) + min; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TimeStampProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TimeStampProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TimeStampProvider.java new file mode 100644 index 0000000..ad7ec86 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TimeStampProvider.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .loadsimulator.util; + +/** + */ +public class TimeStampProvider { + private int timeStep; + private long currentTime; + private int sendInterval; + + public TimeStampProvider(long startTime, int timeStep, int sendInterval) { + this.timeStep = timeStep; + this.currentTime = startTime - timeStep; + this.sendInterval = sendInterval; + } + + public long next() { + return currentTime += timeStep; + } + + public long[] timestampsForNextInterval() { + return timestampsForInterval(sendInterval); + } + + private long[] timestampsForInterval(int sendInterval) { + int steps = sendInterval / timeStep; + long[] timestamps = new long[steps]; + + for (int i = 0; i < timestamps.length; i++) { + timestamps[i] = next(); + } + + return timestamps; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java new file mode 100644 index 0000000..a123e57 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java @@ -0,0 +1,229 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Date; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; + +public abstract class AbstractTimelineAggregator implements Runnable { + protected final PhoenixHBaseAccessor hBaseAccessor; + private final Log LOG; + protected final long checkpointDelayMillis; + protected final Integer resultsetFetchSize; + protected Configuration metricsConf; + + public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor, + Configuration metricsConf) { + this.hBaseAccessor = hBaseAccessor; + this.metricsConf = metricsConf; + this.checkpointDelayMillis = SECONDS.toMillis( + metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)); + this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000); + this.LOG = LogFactory.getLog(this.getClass()); + } + + @Override + public void run() { + LOG.info("Started Timeline aggregator thread @ " + new Date()); + Long SLEEP_INTERVAL = getSleepIntervalMillis(); + + while (true) { + long currentTime = System.currentTimeMillis(); + long lastCheckPointTime = -1; + + try { + lastCheckPointTime = readCheckPoint(); + if (isLastCheckPointTooOld(lastCheckPointTime)) { + LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " + + "lastCheckPointTime = " + lastCheckPointTime); + lastCheckPointTime = -1; + } + if (lastCheckPointTime == -1) { + // Assuming first run, save checkpoint and sleep. + // Set checkpoint to 2 minutes in the past to allow the + // agents/collectors to catch up + saveCheckPoint(currentTime - checkpointDelayMillis); + } + } catch (IOException io) { + LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io); + } + long sleepTime = SLEEP_INTERVAL; + + if (lastCheckPointTime != -1) { + LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: " + + ((System.currentTimeMillis() - lastCheckPointTime) / 1000) + + " seconds."); + + long startTime = System.currentTimeMillis(); + boolean success = doWork(lastCheckPointTime, + lastCheckPointTime + SLEEP_INTERVAL); + long executionTime = System.currentTimeMillis() - startTime; + long delta = SLEEP_INTERVAL - executionTime; + + if (delta > 0) { + // Sleep for (configured sleep - time to execute task) + sleepTime = delta; + } else { + // No sleep because last run took too long to execute + LOG.info("Aggregator execution took too long, " + + "cancelling sleep. executionTime = " + executionTime); + sleepTime = 1; + } + + LOG.debug("Aggregator sleep interval = " + sleepTime); + + if (success) { + try { + saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL); + } catch (IOException io) { + LOG.warn("Error saving checkpoint, restarting aggregation at " + + "previous checkpoint."); + } + } + } + + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted, continuing with aggregation."); + } + } + } + + private boolean isLastCheckPointTooOld(long checkpoint) { + return checkpoint != -1 && + ((System.currentTimeMillis() - checkpoint) > + getCheckpointCutOffIntervalMillis()); + } + + private long readCheckPoint() { + try { + File checkpoint = new File(getCheckpointLocation()); + if (checkpoint.exists()) { + String contents = FileUtils.readFileToString(checkpoint); + if (contents != null && !contents.isEmpty()) { + return Long.parseLong(contents); + } + } + } catch (IOException io) { + LOG.debug(io); + } + return -1; + } + + private void saveCheckPoint(long checkpointTime) throws IOException { + File checkpoint = new File(getCheckpointLocation()); + if (!checkpoint.exists()) { + boolean done = checkpoint.createNewFile(); + if (!done) { + throw new IOException("Could not create checkpoint at location, " + + getCheckpointLocation()); + } + } + FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime)); + } + + /** + * Read metrics written during the time interval and save the sum and total + * in the aggregate table. + * + * @param startTime Sample start time + * @param endTime Sample end time + */ + protected boolean doWork(long startTime, long endTime) { + LOG.info("Start aggregation cycle @ " + new Date() + ", " + + "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime)); + + boolean success = true; + PhoenixTransactSQL.Condition condition = + prepareMetricQueryCondition(startTime, endTime); + + Connection conn = null; + PreparedStatement stmt = null; + + try { + conn = hBaseAccessor.getConnection(); + stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + + LOG.debug("Query issued @: " + new Date()); + ResultSet rs = stmt.executeQuery(); + LOG.debug("Query returned @: " + new Date()); + + aggregate(rs, startTime, endTime); + LOG.info("End aggregation cycle @ " + new Date()); + + } catch (SQLException e) { + LOG.error("Exception during aggregating metrics.", e); + success = false; + } catch (IOException e) { + LOG.error("Exception during aggregating metrics.", e); + success = false; + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + + LOG.info("End aggregation cycle @ " + new Date()); + return success; + } + + protected abstract PhoenixTransactSQL.Condition + prepareMetricQueryCondition(long startTime, long endTime); + + protected abstract void aggregate(ResultSet rs, long startTime, long endTime) + throws IOException, SQLException; + + protected abstract Long getSleepIntervalMillis(); + + protected abstract Integer getCheckpointCutOffMultiplier(); + + protected Long getCheckpointCutOffIntervalMillis() { + return getCheckpointCutOffMultiplier() * getSleepIntervalMillis(); + } + + protected abstract boolean isDisabled(); + + protected abstract String getCheckpointLocation(); + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java new file mode 100644 index 0000000..f514298 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/Aggregator.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + + +import java.util.Map; + +/** + * + */ +public class Aggregator { + + public double[] calculateAggregates(Map<Long, Double> metricValues) { + double[] values = new double[4]; + double max = Double.MIN_VALUE; + double min = Double.MAX_VALUE; + double sum = 0.0; + int metricCount = 0; + + if (metricValues != null && !metricValues.isEmpty()) { + for (Double value : metricValues.values()) { + // TODO: Some nulls in data - need to investigate null values from host + if (value != null) { + if (value > max) { + max = value; + } + if (value < min) { + min = value; + } + sum += value; + } + } + metricCount = metricValues.values().size(); + } + // BR: WHY ZERO is a good idea? + values[0] = sum; + values[1] = max != Double.MIN_VALUE ? max : 0.0; + values[2] = min != Double.MAX_VALUE ? min : 0.0; + values[3] = metricCount; + + return values; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java new file mode 100644 index 0000000..47435f4 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ConnectionProvider.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + + +import java.sql.Connection; +import java.sql.SQLException; + +/** + * + */ +public interface ConnectionProvider { + public Connection getConnection() throws SQLException; +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java new file mode 100644 index 0000000..652c492 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; + +public class DefaultPhoenixDataSource implements ConnectionProvider { + + static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class); + private static final String ZOOKEEPER_CLIENT_PORT = + "hbase.zookeeper.property.clientPort"; + private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + private static final String ZNODE_PARENT = "zookeeper.znode.parent"; + + private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s"; + private final String url; + + public DefaultPhoenixDataSource(Configuration hbaseConf) { + String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, + "2181"); + String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM); + String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/hbase"); + if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) { + throw new IllegalStateException("Unable to find Zookeeper quorum to " + + "access HBase store using Phoenix."); + } + + url = String.format(connectionUrl, + zookeeperQuorum, + zookeeperClientPort, + znodeParent); + } + + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + public Connection getConnection() throws SQLException { + + LOG.debug("Metric store connection url: " + url); + try { + return DriverManager.getConnection(url); + } catch (SQLException e) { + LOG.warn("Unable to connect to HBase store using Phoenix.", e); + + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java new file mode 100644 index 0000000..9364187 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; + +import java.io.IOException; +import java.net.URL; +import java.sql.SQLException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE; + +public class HBaseTimelineMetricStore extends AbstractService + implements TimelineMetricStore { + + static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class); + private PhoenixHBaseAccessor hBaseAccessor; + + /** + * Construct the service. + * + */ + public HBaseTimelineMetricStore() { + super(HBaseTimelineMetricStore.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = getClass().getClassLoader(); + } + URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE); + URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE); + LOG.info("Found hbase site configuration: " + hbaseResUrl); + LOG.info("Found metric service configuration: " + amsResUrl); + + if (hbaseResUrl == null) { + throw new IllegalStateException("Unable to initialize the metrics " + + "subsystem. No hbase-site present in the classpath."); + } + + if (amsResUrl == null) { + throw new IllegalStateException("Unable to initialize the metrics " + + "subsystem. No ams-site present in the classpath."); + } + + Configuration hbaseConf = new Configuration(true); + hbaseConf.addResource(hbaseResUrl.toURI().toURL()); + Configuration metricsConf = new Configuration(true); + metricsConf.addResource(amsResUrl.toURI().toURL()); + + initializeSubsystem(hbaseConf, metricsConf); + } + + private void initializeSubsystem(Configuration hbaseConf, + Configuration metricsConf) { + hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf); + hBaseAccessor.initMetricSchema(); + + // Start the cluster aggregator + TimelineMetricClusterAggregator minuteClusterAggregator = + new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf); + if (!minuteClusterAggregator.isDisabled()) { + Thread aggregatorThread = new Thread(minuteClusterAggregator); + aggregatorThread.start(); + } + + // Start the cluster aggregator hourly + TimelineMetricClusterAggregatorHourly hourlyClusterAggregator = + new TimelineMetricClusterAggregatorHourly(hBaseAccessor, metricsConf); + if (!hourlyClusterAggregator.isDisabled()) { + Thread aggregatorThread = new Thread(hourlyClusterAggregator); + aggregatorThread.start(); + } + + // Start the 5 minute aggregator + TimelineMetricAggregator minuteHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute + (hBaseAccessor, metricsConf); + if (!minuteHostAggregator.isDisabled()) { + Thread minuteAggregatorThread = new Thread(minuteHostAggregator); + minuteAggregatorThread.start(); + } + + // Start hourly host aggregator + TimelineMetricAggregator hourlyHostAggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly + (hBaseAccessor, metricsConf); + if (!hourlyHostAggregator.isDisabled()) { + Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator); + aggregatorHourlyThread.start(); + } + } + + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + } + + //TODO: update to work with HOSTS_COUNT and METRIC_COUNT + @Override + public TimelineMetrics getTimelineMetrics(List<String> metricNames, + String hostname, String applicationId, String instanceId, + Long startTime, Long endTime, Integer limit, + boolean groupedByHosts) throws SQLException, IOException { + + Condition condition = new Condition(metricNames, hostname, applicationId, + instanceId, startTime, endTime, limit, groupedByHosts); + + if (hostname == null) { + return hBaseAccessor.getAggregateMetricRecords(condition); + } + + return hBaseAccessor.getMetricRecords(condition); + } + + @Override + public TimelineMetric getTimelineMetric(String metricName, String hostname, + String applicationId, String instanceId, Long startTime, + Long endTime, Integer limit) + throws SQLException, IOException { + + TimelineMetrics metrics = hBaseAccessor.getMetricRecords( + new Condition(Collections.singletonList(metricName), hostname, + applicationId, instanceId, startTime, endTime, limit, true) + ); + + TimelineMetric metric = new TimelineMetric(); + List<TimelineMetric> metricList = metrics.getMetrics(); + + if (metricList != null && !metricList.isEmpty()) { + metric.setMetricName(metricList.get(0).getMetricName()); + metric.setAppId(metricList.get(0).getAppId()); + metric.setInstanceId(metricList.get(0).getInstanceId()); + metric.setHostName(metricList.get(0).getHostName()); + // Assumption that metrics are ordered by start time + metric.setStartTime(metricList.get(0).getStartTime()); + Map<Long, Double> metricRecords = new HashMap<Long, Double>(); + for (TimelineMetric timelineMetric : metricList) { + metricRecords.putAll(timelineMetric.getMetricValues()); + } + metric.setMetricValues(metricRecords); + } + + return metric; + } + + + @Override + public TimelinePutResponse putMetrics(TimelineMetrics metrics) + throws SQLException, IOException { + + // Error indicated by the Sql exception + TimelinePutResponse response = new TimelinePutResponse(); + + hBaseAccessor.insertMetricRecords(metrics); + + return response; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java new file mode 100644 index 0000000..61e15d7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.codehaus.jackson.annotate.JsonProperty; +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.map.ObjectMapper; + +import java.io.IOException; + +/** +* +*/ +@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class), + @JsonSubTypes.Type(value = MetricHostAggregate.class)}) +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class MetricAggregate { + private static final ObjectMapper mapper = new ObjectMapper(); + + protected Double sum = 0.0; + protected Double deviation; + protected Double max = Double.MIN_VALUE; + protected Double min = Double.MAX_VALUE; + + public MetricAggregate() { + } + + MetricAggregate(Double sum, Double deviation, Double max, + Double min) { + this.sum = sum; + this.deviation = deviation; + this.max = max; + this.min = min; + } + + void updateSum(Double sum) { + this.sum += sum; + } + + void updateMax(Double max) { + if (max > this.max) { + this.max = max; + } + } + + void updateMin(Double min) { + if (min < this.min) { + this.min = min; + } + } + + @JsonProperty("sum") + Double getSum() { + return sum; + } + + @JsonProperty("deviation") + Double getDeviation() { + return deviation; + } + + @JsonProperty("max") + Double getMax() { + return max; + } + + @JsonProperty("min") + Double getMin() { + return min; + } + + public void setSum(Double sum) { + this.sum = sum; + } + + public void setDeviation(Double deviation) { + this.deviation = deviation; + } + + public void setMax(Double max) { + this.max = max; + } + + public void setMin(Double min) { + this.min = min; + } + + public String toJSON() throws IOException { + return mapper.writeValueAsString(this); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java new file mode 100644 index 0000000..c13c85f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** +* +*/ +public class MetricClusterAggregate extends MetricAggregate { + private int numberOfHosts; + + @JsonCreator + public MetricClusterAggregate() { + } + + MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfHosts = numberOfHosts; + } + + @JsonProperty("numberOfHosts") + int getNumberOfHosts() { + return numberOfHosts; + } + + void updateNumberOfHosts(int count) { + this.numberOfHosts += count; + } + + public void setNumberOfHosts(int numberOfHosts) { + this.numberOfHosts = numberOfHosts; + } + + /** + * Find and update min, max and avg for a minute + */ + void updateAggregates(MetricClusterAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfHosts(hostAggregate.getNumberOfHosts()); + } + + @Override + public String toString() { +// MetricClusterAggregate + return "MetricAggregate{" + + "sum=" + sum + + ", numberOfHosts=" + numberOfHosts + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java new file mode 100644 index 0000000..02cc207 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +/** + * Represents a collection of minute based aggregation of values for + * resolution greater than a minute. + */ +public class MetricHostAggregate extends MetricAggregate { + + private long numberOfSamples = 0; + + @JsonCreator + public MetricHostAggregate() { + super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE); + } + + public MetricHostAggregate(Double sum, int numberOfSamples, + Double deviation, + Double max, Double min) { + super(sum, deviation, max, min); + this.numberOfSamples = numberOfSamples; + } + + @JsonProperty("numberOfSamples") + long getNumberOfSamples() { + return numberOfSamples == 0 ? 1 : numberOfSamples; + } + + void updateNumberOfSamples(long count) { + this.numberOfSamples += count; + } + + public void setNumberOfSamples(long numberOfSamples) { + this.numberOfSamples = numberOfSamples; + } + + public double getAvg() { + return sum / numberOfSamples; + } + + /** + * Find and update min, max and avg for a minute + */ + void updateAggregates(MetricHostAggregate hostAggregate) { + updateMax(hostAggregate.getMax()); + updateMin(hostAggregate.getMin()); + updateSum(hostAggregate.getSum()); + updateNumberOfSamples(hostAggregate.getNumberOfSamples()); + } + + @Override + public String toString() { + return "MetricHostAggregate{" + + "sum=" + sum + + ", numberOfSamples=" + numberOfSamples + + ", deviation=" + deviation + + ", max=" + max + + ", min=" + min + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java new file mode 100644 index 0000000..88a427a --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricsInitializationException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics + .timeline; + +/** + * RuntimeException for initialization of metrics schema. It is RuntimeException + * since this is a not recoverable situation, and should be handled by main or + * service method followed by shutdown. + */ +public class MetricsInitializationException extends RuntimeException { + public MetricsInitializationException() { + } + + public MetricsInitializationException(String msg) { + super(msg); + } + + public MetricsInitializationException(Throwable t) { + super(t); + } + + public MetricsInitializationException(String msg, Throwable t) { + super(msg, t); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/ba3d6926/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java new file mode 100644 index 0000000..4f248b7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java @@ -0,0 +1,678 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounterFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL; + +/** + * Provides a facade over the Phoenix API to access HBase schema + */ +public class PhoenixHBaseAccessor { + + private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class); + private final Configuration hbaseConf; + private final Configuration metricsConf; + private final RetryCounterFactory retryCounterFactory; + + static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000; + /** + * 4 metrics/min * 60 * 24: Retrieve data for 1 day. + */ + private static final int METRICS_PER_MINUTE = 4; + public static int RESULTSET_LIMIT = (int)TimeUnit.DAYS.toMinutes(1) * + METRICS_PER_MINUTE; + private static ObjectMapper mapper = new ObjectMapper(); + + private static TypeReference<Map<Long, Double>> metricValuesTypeRef = + new TypeReference<Map<Long, Double>>() {}; + private final ConnectionProvider dataSource; + + public PhoenixHBaseAccessor(Configuration hbaseConf, + Configuration metricsConf){ + this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf)); + } + + public PhoenixHBaseAccessor(Configuration hbaseConf, + Configuration metricsConf, + ConnectionProvider dataSource) { + this.hbaseConf = hbaseConf; + this.metricsConf = metricsConf; + RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, 5760); + try { + Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); + } catch (ClassNotFoundException e) { + LOG.error("Phoenix client jar not found in the classpath.", e); + throw new IllegalStateException(e); + } + this.dataSource = dataSource; + this.retryCounterFactory = new RetryCounterFactory( + metricsConf.getInt(GLOBAL_MAX_RETRIES, 10), + (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 5))); + } + + + private Connection getConnectionRetryingOnException() + throws SQLException, InterruptedException { + RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try{ + return getConnection(); + } catch (SQLException e) { + if(!retryCounter.shouldRetry()){ + LOG.error("HBaseAccessor getConnection failed after " + + retryCounter.getMaxAttempts() + " attempts"); + throw e; + } + } + retryCounter.sleepUntilNextRetry(); + } + } + + + /** + * Get JDBC connection to HBase store. Assumption is that the hbase + * configuration is present on the classpath and loaded by the caller into + * the Configuration object. + * Phoenix already caches the HConnection between the client and HBase + * cluster. + * + * @return @java.sql.Connection + */ + public Connection getConnection() throws SQLException { + return dataSource.getConnection(); + } + + public static Map readMetricFromJSON(String json) throws IOException { + return mapper.readValue(json, metricValuesTypeRef); + } + + @SuppressWarnings("unchecked") + static TimelineMetric getTimelineMetricFromResultSet(ResultSet rs) + throws SQLException, IOException { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(rs.getString("METRIC_NAME")); + metric.setAppId(rs.getString("APP_ID")); + metric.setInstanceId(rs.getString("INSTANCE_ID")); + metric.setHostName(rs.getString("HOSTNAME")); + metric.setTimestamp(rs.getLong("SERVER_TIME")); + metric.setStartTime(rs.getLong("START_TIME")); + metric.setType(rs.getString("UNITS")); + metric.setMetricValues( + (Map<Long, Double>) readMetricFromJSON(rs.getString("METRICS"))); + return metric; + } + + static TimelineMetric getTimelineMetricKeyFromResultSet(ResultSet rs) + throws SQLException, IOException { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(rs.getString("METRIC_NAME")); + metric.setAppId(rs.getString("APP_ID")); + metric.setInstanceId(rs.getString("INSTANCE_ID")); + metric.setHostName(rs.getString("HOSTNAME")); + metric.setTimestamp(rs.getLong("SERVER_TIME")); + metric.setType(rs.getString("UNITS")); + return metric; + } + + static MetricHostAggregate getMetricHostAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricHostAggregate metricHostAggregate = new MetricHostAggregate(); + metricHostAggregate.setSum(rs.getDouble("METRIC_SUM")); + metricHostAggregate.setMax(rs.getDouble("METRIC_MAX")); + metricHostAggregate.setMin(rs.getDouble("METRIC_MIN")); + metricHostAggregate.setNumberOfSamples(rs.getLong("METRIC_COUNT")); + + metricHostAggregate.setDeviation(0.0); + return metricHostAggregate; + } + + static TimelineClusterMetric + getTimelineMetricClusterKeyFromResultSet(ResultSet rs) + throws SQLException, IOException { + TimelineClusterMetric metric = new TimelineClusterMetric( + rs.getString("METRIC_NAME"), + rs.getString("APP_ID"), + rs.getString("INSTANCE_ID"), + rs.getLong("SERVER_TIME"), + rs.getString("UNITS")); + + return metric; + } + + static MetricClusterAggregate + getMetricClusterAggregateFromResultSet(ResultSet rs) + throws SQLException { + MetricClusterAggregate agg = new MetricClusterAggregate(); + agg.setSum(rs.getDouble("METRIC_SUM")); + agg.setMax(rs.getDouble("METRIC_MAX")); + agg.setMin(rs.getDouble("METRIC_MIN")); + agg.setNumberOfHosts(rs.getInt("HOSTS_COUNT")); + + agg.setDeviation(0.0); + + return agg; + } + + protected void initMetricSchema() { + Connection conn = null; + Statement stmt = null; + + String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING); + String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION); + String precisionTtl = metricsConf.get(PRECISION_TABLE_TTL, "86400"); + String hostMinTtl = metricsConf.get(HOST_MINUTE_TABLE_TTL, "604800"); + String hostHourTtl = metricsConf.get(HOST_HOUR_TABLE_TTL, "2592000"); + String clusterMinTtl = metricsConf.get(CLUSTER_MINUTE_TABLE_TTL, "2592000"); + String clusterHourTtl = metricsConf.get(CLUSTER_HOUR_TABLE_TTL, "31536000"); + + try { + LOG.info("Initializing metrics schema..."); + conn = getConnectionRetryingOnException(); + stmt = conn.createStatement(); + + stmt.executeUpdate(String.format(CREATE_METRICS_TABLE_SQL, + encoding, precisionTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL, + encoding, hostHourTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL, + encoding, hostMinTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL, + encoding, clusterMinTtl, compression)); + stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL, + encoding, clusterHourTtl, compression)); + conn.commit(); + } catch (SQLException sql) { + LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", sql); + throw new MetricsInitializationException( + "Error creating Metrics Schema in HBase using Phoenix.", sql); + } catch (InterruptedException e) { + LOG.warn("Error creating Metrics Schema in HBase using Phoenix.", e); + throw new MetricsInitializationException( + "Error creating Metrics Schema in HBase using Phoenix.", e); + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + // Ignore + } + } + } + } + + public void insertMetricRecords(TimelineMetrics metrics) + throws SQLException, IOException { + + List<TimelineMetric> timelineMetrics = metrics.getMetrics(); + if (timelineMetrics == null || timelineMetrics.isEmpty()) { + LOG.debug("Empty metrics insert request."); + return; + } + + Connection conn = getConnection(); + PreparedStatement metricRecordStmt = null; + long currentTime = System.currentTimeMillis(); + + try { + metricRecordStmt = conn.prepareStatement(String.format( + UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME)); + + for (TimelineMetric metric : timelineMetrics) { + metricRecordStmt.clearParameters(); + + LOG.trace("host: " + metric.getHostName() + ", " + + "metricName = " + metric.getMetricName() + ", " + + "values: " + metric.getMetricValues()); + Aggregator agg = new Aggregator(); + double[] aggregates = agg.calculateAggregates( + metric.getMetricValues()); + + metricRecordStmt.setString(1, metric.getMetricName()); + metricRecordStmt.setString(2, metric.getHostName()); + metricRecordStmt.setString(3, metric.getAppId()); + metricRecordStmt.setString(4, metric.getInstanceId()); + metricRecordStmt.setLong(5, currentTime); + metricRecordStmt.setLong(6, metric.getStartTime()); + metricRecordStmt.setString(7, metric.getType()); + metricRecordStmt.setDouble(8, aggregates[0]); + metricRecordStmt.setDouble(9, aggregates[1]); + metricRecordStmt.setDouble(10, aggregates[2]); + metricRecordStmt.setLong(11, (long)aggregates[3]); + String json = + TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues()); + metricRecordStmt.setString(12, json); + + try { + metricRecordStmt.executeUpdate(); + } catch (SQLException sql) { + LOG.error(sql); + } + } + + conn.commit(); + + } finally { + if (metricRecordStmt != null) { + try { + metricRecordStmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + } + + + @SuppressWarnings("unchecked") + public TimelineMetrics getMetricRecords(final Condition condition) + throws SQLException, IOException { + + if (condition.isEmpty()) { + throw new SQLException("No filter criteria specified."); + } + + Connection conn = getConnection(); + PreparedStatement stmt = null; + TimelineMetrics metrics = new TimelineMetrics(); + + try { + stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + + ResultSet rs = stmt.executeQuery(); + + while (rs.next()) { + TimelineMetric metric = getTimelineMetricFromResultSet(rs); + + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric); + } + } + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + return metrics; + } + + public void saveHostAggregateRecords(Map<TimelineMetric, + MetricHostAggregate> hostAggregateMap, String phoenixTableName) + throws SQLException { + + if (hostAggregateMap != null && !hostAggregateMap.isEmpty()) { + Connection conn = getConnection(); + PreparedStatement stmt = null; + + long start = System.currentTimeMillis(); + int rowCount = 0; + + try { + stmt = conn.prepareStatement( + String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName)); + + for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate : + hostAggregateMap.entrySet()) { + + TimelineMetric metric = metricAggregate.getKey(); + MetricHostAggregate hostAggregate = metricAggregate.getValue(); + + rowCount++; + stmt.clearParameters(); + stmt.setString(1, metric.getMetricName()); + stmt.setString(2, metric.getHostName()); + stmt.setString(3, metric.getAppId()); + stmt.setString(4, metric.getInstanceId()); + stmt.setLong(5, metric.getTimestamp()); + stmt.setString(6, metric.getType()); + stmt.setDouble(7, hostAggregate.getSum()); + stmt.setDouble(8, hostAggregate.getMax()); + stmt.setDouble(9, hostAggregate.getMin()); + stmt.setDouble(10, hostAggregate.getNumberOfSamples()); + + try { + // TODO: Why this exception is swallowed + stmt.executeUpdate(); + } catch (SQLException sql) { + LOG.error(sql); + } + + if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { + conn.commit(); + rowCount = 0; + } + + } + + conn.commit(); + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + + long end = System.currentTimeMillis(); + + if ((end - start) > 60000l) { + LOG.info("Time to save map: " + (end - start) + ", " + + "thread = " + Thread.currentThread().getClass()); + } + } + } + + /** + * Save Metric aggregate records. + * + * @throws SQLException + */ + public void saveClusterAggregateRecords( + Map<TimelineClusterMetric, MetricClusterAggregate> records) + throws SQLException { + + if (records == null || records.isEmpty()) { + LOG.debug("Empty aggregate records."); + return; + } + + long start = System.currentTimeMillis(); + + Connection conn = getConnection(); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(UPSERT_CLUSTER_AGGREGATE_SQL); + int rowCount = 0; + + for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> + aggregateEntry : records.entrySet()) { + TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); + MetricClusterAggregate aggregate = aggregateEntry.getValue(); + + LOG.trace("clusterMetric = " + clusterMetric + ", " + + "aggregate = " + aggregate); + + rowCount++; + stmt.clearParameters(); + stmt.setString(1, clusterMetric.getMetricName()); + stmt.setString(2, clusterMetric.getAppId()); + stmt.setString(3, clusterMetric.getInstanceId()); + stmt.setLong(4, clusterMetric.getTimestamp()); + stmt.setString(5, clusterMetric.getType()); + stmt.setDouble(6, aggregate.getSum()); + stmt.setInt(7, aggregate.getNumberOfHosts()); + stmt.setDouble(8, aggregate.getMax()); + stmt.setDouble(9, aggregate.getMin()); + + try { + stmt.executeUpdate(); + } catch (SQLException sql) { + // TODO: Why this exception is swallowed + LOG.error(sql); + } + + if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { + conn.commit(); + rowCount = 0; + } + } + + conn.commit(); + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + long end = System.currentTimeMillis(); + if ((end - start) > 60000l) { + LOG.info("Time to save: " + (end - start) + ", " + + "thread = " + Thread.currentThread().getName()); + } + } + + /** + * Save Metric aggregate records. + * + * @throws SQLException + */ + public void saveClusterAggregateHourlyRecords( + Map<TimelineClusterMetric, MetricHostAggregate> records, + String tableName) + throws SQLException { + if (records == null || records.isEmpty()) { + LOG.debug("Empty aggregate records."); + return; + } + + long start = System.currentTimeMillis(); + + Connection conn = getConnection(); + PreparedStatement stmt = null; + try { + stmt = conn.prepareStatement(String.format + (UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName)); + int rowCount = 0; + + for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> + aggregateEntry : records.entrySet()) { + TimelineClusterMetric clusterMetric = aggregateEntry.getKey(); + MetricHostAggregate aggregate = aggregateEntry.getValue(); + + LOG.trace("clusterMetric = " + clusterMetric + ", " + + "aggregate = " + aggregate); + + rowCount++; + stmt.clearParameters(); + stmt.setString(1, clusterMetric.getMetricName()); + stmt.setString(2, clusterMetric.getAppId()); + stmt.setString(3, clusterMetric.getInstanceId()); + stmt.setLong(4, clusterMetric.getTimestamp()); + stmt.setString(5, clusterMetric.getType()); + stmt.setDouble(6, aggregate.getSum()); +// stmt.setInt(7, aggregate.getNumberOfHosts()); + stmt.setLong(7, aggregate.getNumberOfSamples()); + stmt.setDouble(8, aggregate.getMax()); + stmt.setDouble(9, aggregate.getMin()); + + try { + stmt.executeUpdate(); + } catch (SQLException sql) { + // we have no way to verify it works!!! + LOG.error(sql); + } + + if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) { + conn.commit(); + rowCount = 0; + } + } + + conn.commit(); + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + long end = System.currentTimeMillis(); + if ((end - start) > 60000l) { + LOG.info("Time to save: " + (end - start) + ", " + + "thread = " + Thread.currentThread().getName()); + } + } + + + public TimelineMetrics getAggregateMetricRecords(final Condition condition) + throws SQLException { + + if (condition.isEmpty()) { + throw new SQLException("No filter criteria specified."); + } + + Connection conn = getConnection(); + PreparedStatement stmt = null; + TimelineMetrics metrics = new TimelineMetrics(); + + try { + stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition); + + ResultSet rs = stmt.executeQuery(); + + while (rs.next()) { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName(rs.getString("METRIC_NAME")); + metric.setAppId(rs.getString("APP_ID")); + metric.setInstanceId(rs.getString("INSTANCE_ID")); + metric.setTimestamp(rs.getLong("SERVER_TIME")); + metric.setStartTime(rs.getLong("SERVER_TIME")); + Map<Long, Double> valueMap = new HashMap<Long, Double>(); + valueMap.put(rs.getLong("SERVER_TIME"), + rs.getDouble("METRIC_SUM") / rs.getInt("HOSTS_COUNT")); + metric.setMetricValues(valueMap); + + if (condition.isGrouped()) { + metrics.addOrMergeTimelineMetric(metric); + } else { + metrics.getMetrics().add(metric); + } + } + + } finally { + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + // Ignore + } + } + if (conn != null) { + try { + conn.close(); + } catch (SQLException sql) { + // Ignore + } + } + } + LOG.info("Aggregate records size: " + metrics.getMetrics().size()); + return metrics; + } +}