Repository: ambari Updated Branches: refs/heads/trunk 5b38d8766 -> 02fd9a796
http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java new file mode 100644 index 0000000..ca141d4 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.controller.metrics.timeline.cache; + +import net.sf.ehcache.CacheException; +import net.sf.ehcache.Ehcache; +import net.sf.ehcache.Element; +import net.sf.ehcache.constructs.blocking.LockTimeoutException; +import net.sf.ehcache.constructs.blocking.UpdatingCacheEntryFactory; +import net.sf.ehcache.constructs.blocking.UpdatingSelfPopulatingCache; +import org.apache.ambari.server.AmbariException; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +public class TimelineMetricCache extends UpdatingSelfPopulatingCache { + + private final static Logger LOG = LoggerFactory.getLogger(TimelineMetricCache.class); + private static AtomicInteger printCacheStatsCounter = new AtomicInteger(0); + + /** + * Creates a SelfPopulatingCache. + * + * @param cache @Cache + * @param factory @CacheEntryFactory + */ + public TimelineMetricCache(Ehcache cache, UpdatingCacheEntryFactory factory) throws CacheException { + super(cache, factory); + } + + /** + * Get metrics for an app grouped by the requested @TemporalInfo which is a + * part of the @TimelineAppMetricCacheKey + * @param key @TimelineAppMetricCacheKey + * @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics + */ + public TimelineMetrics getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws IllegalArgumentException { + if (LOG.isDebugEnabled()) { + LOG.debug("Fetching metrics with key: " + key); + } + + // Make sure key is valid + validateKey(key); + + Element element = get(key); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + if (element != null) { + TimelineMetricsCacheValue value = (TimelineMetricsCacheValue) element.getObjectValue(); + if (LOG.isDebugEnabled()) { + LOG.debug("Returning value from cache: " + value); + } + timelineMetrics.setMetrics(new ArrayList<TimelineMetric>(value.getTimelineMetrics().values())); + } + + if (LOG.isDebugEnabled()) { + // Print stats every 100 calls - Note: Supported in debug mode only + if (printCacheStatsCounter.getAndIncrement() == 0) { + LOG.debug("Metrics cache stats => \n" + this.getStatistics()); + } else { + printCacheStatsCounter.compareAndSet(100, 0); + } + } + + return timelineMetrics; + } + + /** + * Set new time bounds on the cache key so that update can use the new + * query window. We do this quietly which means regular get/update logic is + * not invoked. + */ + @Override + public Element get(Object key) throws LockTimeoutException { + Element element = this.getQuiet(key); + if (element != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("key : " + element.getObjectKey()); + LOG.trace("value : " + element.getObjectValue()); + } + + // Set new time boundaries on the key + TimelineAppMetricCacheKey existingKey = (TimelineAppMetricCacheKey) element.getObjectKey(); + + LOG.debug("Existing temporal info: " + existingKey.getTemporalInfo() + + " for : " + existingKey.getMetricNames()); + + TimelineAppMetricCacheKey newKey = (TimelineAppMetricCacheKey) key; + existingKey.setTemporalInfo(newKey.getTemporalInfo()); + + LOG.debug("New temporal info: " + newKey.getTemporalInfo() + + " for : " + existingKey.getMetricNames()); + } + + return super.get(key); + } + + private void validateKey(TimelineAppMetricCacheKey key) throws IllegalArgumentException { + StringBuilder msg = new StringBuilder("Invalid metric key requested."); + boolean throwException = false; + + if (key.getTemporalInfo() == null) { + msg.append(" No temporal info provided."); + throwException = true; + } + + if (key.getSpec() == null) { + msg.append(" Missing call spec for metric request."); + } + + if (throwException) { + throw new IllegalArgumentException(msg.toString()); + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java new file mode 100644 index 0000000..597f037 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java @@ -0,0 +1,299 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.controller.metrics.timeline.cache; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import net.sf.ehcache.constructs.blocking.UpdatingCacheEntryFactory; +import org.apache.ambari.server.configuration.ComponentSSLConfiguration; +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.internal.URLStreamProvider; +import org.apache.ambari.server.controller.metrics.timeline.MetricsRequestHelper; +import org.apache.ambari.server.controller.spi.TemporalInfo; +import org.apache.ambari.server.controller.utilities.StreamProvider; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.http.client.utils.URIBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +@Singleton +public class TimelineMetricCacheEntryFactory implements UpdatingCacheEntryFactory { + private final static Logger LOG = LoggerFactory.getLogger(TimelineMetricCacheEntryFactory.class); + // Not declared final to ease unit test code and allow streamProvider + // injection + private MetricsRequestHelper requestHelperForGets; + private MetricsRequestHelper requestHelperForUpdates; + private final Long BUFFER_TIME_DIFF_CATCHUP_INTERVAL; + + @Inject + public TimelineMetricCacheEntryFactory(Configuration configuration) { + // Longer timeout for first cache miss + requestHelperForGets = new MetricsRequestHelper(new URLStreamProvider( + configuration.getMetricsRequestConnectTimeoutMillis(), + configuration.getMetricsRequestReadTimeoutMillis(), + ComponentSSLConfiguration.instance())); + + // Timeout setting different from first request timeout + // Allows stale data to be returned at the behest of performance. + requestHelperForUpdates = new MetricsRequestHelper(new URLStreamProvider( + configuration.getMetricsRequestConnectTimeoutMillis(), + configuration.getMetricsRequestIntervalReadTimeoutMillis(), + ComponentSSLConfiguration.instance())); + + BUFFER_TIME_DIFF_CATCHUP_INTERVAL = configuration.getMetricRequestBufferTimeCatchupInterval(); + } + + /** + * This method is called on a get element from cache call when key is not + * found in cache, returns a value for the key to be cached. + * + * @param key @org.apache.ambari.server.controller.metrics.timeline.cache.TimelineAppMetricCacheKey + * @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics + * @throws Exception + */ + @Override + public Object createEntry(Object key) throws Exception { + LOG.debug("Creating cache entry since none exists, key = " + key); + TimelineAppMetricCacheKey metricCacheKey = (TimelineAppMetricCacheKey) key; + + TimelineMetrics timelineMetrics = + requestHelperForGets.fetchTimelineMetrics(metricCacheKey.getSpec()); + + TimelineMetricsCacheValue value = null; + + if (timelineMetrics != null && !timelineMetrics.getMetrics().isEmpty()) { + Map<String, TimelineMetric> cacheValue = + new HashMap<String, TimelineMetric>(timelineMetrics.getMetrics().size()); + for (TimelineMetric metric : timelineMetrics.getMetrics()) { + cacheValue.put(metric.getMetricName(), metric); + } + + value = new TimelineMetricsCacheValue( + metricCacheKey.getTemporalInfo().getStartTime(), + metricCacheKey.getTemporalInfo().getEndTime(), + cacheValue // Null or empty should prompt a refresh + ); + + LOG.debug("Created cache entry: " + value); + } + + return value; + } + + /** + * Called on a get call for existing values in the cache, + * the necessary locking code is present in the get call and this call + * should update the value of the cache entry before returning. + * + * @param key @org.apache.ambari.server.controller.metrics.timeline.cache.TimelineAppMetricCacheKey + * @param value @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics + * @throws Exception + */ + @Override + public void updateEntryValue(Object key, Object value) throws Exception { + TimelineAppMetricCacheKey metricCacheKey = (TimelineAppMetricCacheKey) key; + TimelineMetricsCacheValue existingMetrics = (TimelineMetricsCacheValue) value; + + LOG.debug("Updating cache entry, key: " + key + ", with value = " + value); + + Long existingSeriesStartTime = existingMetrics.getStartTime(); + Long existingSeriesEndTime = existingMetrics.getEndTime(); + + TemporalInfo newTemporalInfo = metricCacheKey.getTemporalInfo(); + Long requestedStartTime = newTemporalInfo.getStartTime(); + Long requestedEndTime = newTemporalInfo.getEndTime(); + + // Calculate new start and end times + URIBuilder uriBuilder = new URIBuilder(metricCacheKey.getSpec()); + Long newStartTime = getRefreshRequestStartTime(existingSeriesStartTime, + existingSeriesEndTime, requestedStartTime); + Long newEndTime = getRefreshRequestEndTime(existingSeriesStartTime, + existingSeriesEndTime, requestedEndTime); + + // Cover complete overlap scenario + // time axis: |-------- exSt ----- reqSt ------ exEnd ----- extEnd ---------| + if (newEndTime > newStartTime && + !(newStartTime.equals(existingSeriesStartTime) && + newEndTime.equals(existingSeriesEndTime))) { + + LOG.debug("Existing cached timeseries startTime = " + + new Date(getMillisecondsTime(existingSeriesStartTime)) + ", endTime = " + + new Date(getMillisecondsTime(existingSeriesEndTime))); + + LOG.debug("Requested timeseries startTime = " + + new Date(getMillisecondsTime(newStartTime)) + ", endTime = " + + new Date(getMillisecondsTime(newEndTime))); + + // Update spec with new start and end time + uriBuilder.setParameter("startTime", String.valueOf(newStartTime)); + uriBuilder.setParameter("endTime", String.valueOf(newEndTime)); + + try { + TimelineMetrics newTimeSeries = requestHelperForUpdates.fetchTimelineMetrics(uriBuilder.toString()); + + // Update existing time series with new values + updateTimelineMetricsInCache(newTimeSeries, existingMetrics, + getMillisecondsTime(requestedStartTime), + getMillisecondsTime(requestedEndTime)); + + // Replace old boundary values + existingMetrics.setStartTime(requestedStartTime); + existingMetrics.setEndTime(requestedEndTime); + + } catch (IOException io) { + if (LOG.isDebugEnabled()) { + LOG.debug("Exception retrieving metrics.", io); + } + } + } else { + LOG.debug("Skip updating cache with new startTime = " + + new Date(getMillisecondsTime(newStartTime)) + + ", new endTime = " + new Date(getMillisecondsTime(newEndTime))); + } + } + + /** + * Update cache with new timeseries data + */ + protected void updateTimelineMetricsInCache(TimelineMetrics newMetrics, + TimelineMetricsCacheValue timelineMetricsCacheValue, + Long requestedStartTime, Long requestedEndTime) { + + Map<String, TimelineMetric> existingTimelineMetricMap = timelineMetricsCacheValue.getTimelineMetrics(); + + // NOTE: Metrics names so far are unique, the Map optimization avoids + // multiple iterations of the List + for (TimelineMetric timelineMetric : newMetrics.getMetrics()) { + if (LOG.isTraceEnabled()) { + TreeMap<Long, Double> sortedMetrics = new TreeMap<Long, Double>(timelineMetric.getMetricValues()); + + LOG.trace("New metric: " + timelineMetric.getMetricName() + + " # " + timelineMetric.getMetricValues().size() + ", startTime = " + + sortedMetrics.firstKey() + ", endTime = " + sortedMetrics.lastKey()); + } + + + TimelineMetric existingMetric = existingTimelineMetricMap.get(timelineMetric.getMetricName()); + + if (existingMetric != null) { + Map<Long, Double> existingMetricValues = existingMetric.getMetricValues(); + LOG.trace("Existing metric: " + timelineMetric.getMetricName() + + " # " + existingMetricValues.size()); + + Iterator<Map.Entry<Long, Double>> valueIterator = existingMetricValues.entrySet().iterator(); + + // Remove old values + // Assumption: All return value are millis + while (valueIterator.hasNext()) { + Map.Entry<Long, Double> metricEntry = valueIterator.next(); + if (metricEntry.getKey() < requestedStartTime + || metricEntry.getKey() > requestedEndTime) { + valueIterator.remove(); + } + } + + // Add new ones + existingMetricValues.putAll(timelineMetric.getMetricValues()); + + if (LOG.isTraceEnabled()) { + TreeMap<Long, Double> sortedMetrics = new TreeMap<Long, Double>(existingMetricValues); + LOG.trace("Merged metric: " + timelineMetric.getMetricName() + ", " + + "Final size: " + existingMetricValues.size() + ", startTime = " + + sortedMetrics.firstKey() + ", endTime = " + sortedMetrics.lastKey()); + } + } else { + existingTimelineMetricMap.put(timelineMetric.getMetricName(), timelineMetric); + } + } + } + + // Scenario: Regular graph updates + // time axis: |-------- exSt ----- reqSt ------ exEnd ----- reqEnd ---------| + // Scenario: Selective graph updates + // time axis: |-------- exSt ----- exEnd ------ reqSt ----- reqEnd ---------| + // Scenario: Extended time window + // time axis: |-------- reSt ----- exSt ------- extEnd ---- reqEnd ---------| + protected Long getRefreshRequestStartTime(Long existingSeriesStartTime, + Long existingSeriesEndTime, Long requestedStartTime) { + Long diff = requestedStartTime - existingSeriesEndTime; + Long startTime = requestedStartTime; + + if (diff < 0 && requestedStartTime > existingSeriesStartTime) { + // Regular graph updates + // Overlapping timeseries data refresh only new part + // Account for missing data on the trailing edge due to buffering + startTime = getTimeShiftedStartTime(existingSeriesEndTime); + } + + LOG.trace("Requesting timeseries data with new startTime = " + + new Date(getMillisecondsTime(startTime))); + + return startTime; + } + + // Scenario: Regular graph updates + // time axis: |-------- exSt ----- reqSt ------ exEnd ----- reqEnd ---------| + // Scenario: Old data request /w overlap + // time axis: |-------- reqSt ----- exSt ------ reqEnd ----- extEnd --------| + // Scenario: Very Old data request /wo overlap + // time axis: |-------- reqSt ----- reqEnd ------ exSt ----- extEnd --------| + protected Long getRefreshRequestEndTime(Long existingSeriesStartTime, + Long existingSeriesEndTime, Long requestedEndTime) { + Long endTime = requestedEndTime; + Long diff = requestedEndTime - existingSeriesEndTime; + if (diff < 0 && requestedEndTime > existingSeriesStartTime) { + // End time overlaps existing timeseries + // Get only older data that might not be in the cache + endTime = existingSeriesStartTime; + } + + LOG.trace("Requesting timeseries data with new endTime = " + + new Date(getMillisecondsTime(endTime))); + return endTime; + } + + /** + * Time shift by a constant taking into account Epoch vs millis + */ + private long getTimeShiftedStartTime(long startTime) { + if (startTime < 9999999999l) { + // Epoch time + return startTime - (BUFFER_TIME_DIFF_CATCHUP_INTERVAL / 1000); + } else { + return startTime - BUFFER_TIME_DIFF_CATCHUP_INTERVAL; + } + } + + private long getMillisecondsTime(long time) { + if (time < 9999999999l) { + return time * 1000; + } else { + return time; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java new file mode 100644 index 0000000..8df957e --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.controller.metrics.timeline.cache; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import net.sf.ehcache.Cache; +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.config.CacheConfiguration; +import net.sf.ehcache.store.MemoryStoreEvictionPolicy; +import org.apache.ambari.server.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache implementation that provides ability to perform incremental reads + * from Metrics backend and reduce the amount of calls between Ambari and the + * Metrics backend. + */ +@Singleton +public class TimelineMetricCacheProvider { + private TimelineMetricCache timelineMetricsCache; + private volatile boolean isCacheInitialized = false; + public static final String TIMELINE_METRIC_CACHE_INSTANCE_NAME = "timelineMetricCache"; + + Configuration configuration; + TimelineMetricCacheEntryFactory cacheEntryFactory; + + private final static Logger LOG = LoggerFactory.getLogger(TimelineMetricCacheProvider.class); + + @Inject + public TimelineMetricCacheProvider(Configuration configuration, + TimelineMetricCacheEntryFactory cacheEntryFactory) { + this.configuration = configuration; + this.cacheEntryFactory = cacheEntryFactory; + } + + private synchronized void initializeCache() { + // Check in case of contention to avoid ObjectExistsException + if (isCacheInitialized) { + return; + } + + //Create a singleton CacheManager using defaults + System.setProperty("net.sf.ehcache.skipUpdateCheck", "true"); + CacheManager manager = CacheManager.getInstance(); + + LOG.info("Creating Metrics Cache with timeouts => ttl = " + + configuration.getMetricCacheTTLSeconds() + ", idle = " + + configuration.getMetricCacheIdleSeconds()); + + //Create a Cache specifying its configuration. + Cache cache = new Cache( + new CacheConfiguration(TIMELINE_METRIC_CACHE_INSTANCE_NAME, configuration.getMetricCacheMaxEntries()) + .timeToLiveSeconds(configuration.getMetricCacheTTLSeconds()) // 1 hour + .timeToIdleSeconds(configuration.getMetricCacheIdleSeconds()) // 5 minutes + .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU) + .eternal(false) + .diskPersistent(false) + .overflowToDisk(false) + .statistics(LOG.isDebugEnabled() || LOG.isTraceEnabled()) + ); + + timelineMetricsCache = new TimelineMetricCache(cache, cacheEntryFactory); + + LOG.info("Registering metrics cache with provider: name = " + + cache.getName() + ", guid: " + cache.getGuid()); + + manager.addCache(timelineMetricsCache); + + isCacheInitialized = true; + } + + /** + * Return an instance of a Ehcache + * @return @TimelineMetricCache or null if caching is disabled through config. + */ + public TimelineMetricCache getTimelineMetricsCache() { + if (configuration.isMetricsCacheDisabled()) { + return null; + } + + if (!isCacheInitialized) { + initializeCache(); + } + return timelineMetricsCache; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java new file mode 100644 index 0000000..f9f1f54 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.controller.metrics.timeline.cache; + +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; + +import java.util.Date; +import java.util.Map; + +/** + * Wrapper object for metrics returned from AMS that includes the query time + * window. + */ +public class TimelineMetricsCacheValue { + private Long startTime; + private Long endTime; + private Map<String, TimelineMetric> timelineMetrics; + + public TimelineMetricsCacheValue(Long startTime, Long endTime, Map<String, TimelineMetric> timelineMetrics) { + this.startTime = startTime; + this.endTime = endTime; + this.timelineMetrics = timelineMetrics; + } + + public Map<String, TimelineMetric> getTimelineMetrics() { + return timelineMetrics; + } + + /** + * Map of metricName to metric values. Works on the assumption that metric + * name is unique + */ + public void setTimelineMetrics(Map<String, TimelineMetric> timelineMetrics) { + this.timelineMetrics = timelineMetrics; + } + + public Long getStartTime() { + return startTime; + } + + public void setStartTime(Long startTime) { + this.startTime = startTime; + } + + public Long getEndTime() { + return endTime; + } + + public void setEndTime(Long endTime) { + this.endTime = endTime; + } + + private long getMillisecondsTime(long time) { + if (time < 9999999999l) { + return time * 1000; + } else { + return time; + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("TimelineMetricsCacheValue {" + + "metricNames = " + timelineMetrics.keySet() + + ", startTime = " + new Date(getMillisecondsTime(startTime)) + + ", endTime = " + new Date(getMillisecondsTime(endTime)) + + ", timelineMetrics ="); + + for (TimelineMetric metric : timelineMetrics.values()) { + sb.append(" { "); + sb.append(metric.getMetricName()); + sb.append(" # "); + sb.append(metric.getMetricValues().size()); + sb.append(" }"); + } + sb.append("}"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java index b8e0596..3ba79ca 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java @@ -19,6 +19,7 @@ package org.apache.ambari.server.controller.internal; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -27,11 +28,18 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.google.inject.Binder; +import com.google.inject.Module; +import com.google.inject.util.Modules; +import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.jmx.TestStreamProvider; import org.apache.ambari.server.controller.metrics.JMXPropertyProviderTest; import org.apache.ambari.server.controller.metrics.MetricsServiceProvider; import org.apache.ambari.server.controller.metrics.ganglia.GangliaPropertyProviderTest.TestGangliaHostProvider; import org.apache.ambari.server.controller.metrics.ganglia.GangliaPropertyProviderTest.TestGangliaServiceProvider; +import org.apache.ambari.server.controller.metrics.timeline.MetricsRequestHelper; +import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory; +import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider; import org.apache.ambari.server.controller.spi.Predicate; import org.apache.ambari.server.controller.spi.PropertyProvider; import org.apache.ambari.server.controller.spi.Request; @@ -52,6 +60,7 @@ import org.apache.ambari.server.state.stack.Metric; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import com.google.inject.Guice; @@ -70,11 +79,29 @@ public class StackDefinedPropertyProviderTest { private Injector injector = null; private OrmTestHelper helper = null; + private static TimelineMetricCacheEntryFactory cacheEntryFactory; + private static TimelineMetricCacheProvider cacheProvider; + + @BeforeClass + public static void setupCache() { + cacheEntryFactory = new TimelineMetricCacheEntryFactory(new Configuration()); + cacheProvider = new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory); + } + + public class TestModuleWithCacheProvider implements Module { + @Override + public void configure(Binder binder) { + binder.bind(TimelineMetricCacheProvider.class).toInstance(cacheProvider); + } + } + @Before public void setup() throws Exception { InMemoryDefaultTestModule module = new InMemoryDefaultTestModule(); - - injector = Guice.createInjector(module); + // Use the same cache provider to ensure there is only once instance of + // Cache available. The @net.sf.ehcache.CacheManager is a singleton and + // does not allow multiple instance with same cache name to be registered. + injector = Guice.createInjector(Modules.override(module).with(new TestModuleWithCacheProvider())); injector.getInstance(GuiceJpaInitializer.class); StackDefinedPropertyProvider.init(injector); @@ -404,10 +431,10 @@ public class StackDefinedPropertyProviderTest { Assert.assertEquals(12, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AggregateContainersReleased"))); Assert.assertEquals(8192, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AvailableMB"))); Assert.assertEquals(1, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AvailableVCores"))); - Assert.assertEquals(47, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AppsSubmitted"))); + Assert.assertEquals(47, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default", "AppsSubmitted"))); - Assert.assertEquals(4, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AggregateContainersAllocated"))); - Assert.assertEquals(4, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AggregateContainersReleased"))); + Assert.assertEquals(4, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AggregateContainersAllocated"))); + Assert.assertEquals(4, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AggregateContainersReleased"))); Assert.assertEquals(6048, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AvailableMB"))); Assert.assertEquals(1, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AvailableVCores"))); Assert.assertEquals(1, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue", "AppsSubmitted"))); @@ -688,8 +715,6 @@ public class StackDefinedPropertyProviderTest { Assert.assertEquals(8444, resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/dfs/journalnode/cluster/mycluster", "lastWrittenTxId"))); } - - @Test public void testPopulateResources_jmx_Storm() throws Exception { // Adjust stack version for cluster @@ -818,7 +843,6 @@ public class StackDefinedPropertyProviderTest { Assert.assertTrue(map.get("metrics/hbase/master").containsKey("IsActiveMaster")); } - @Test public void testPopulateResources_params_category5() throws Exception { org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider streamProvider = @@ -1042,7 +1066,7 @@ public class StackDefinedPropertyProviderTest { org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider streamProvider = new org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider("ams/aggregate_component_metric.json"); - + injectCacheEntryFactoryWithStreamProvider(streamProvider); JMXPropertyProviderTest.TestJMXHostProvider jmxHostProvider = new JMXPropertyProviderTest.TestJMXHostProvider(true); TestGangliaHostProvider hostProvider = new TestGangliaHostProvider(); MetricsServiceProvider serviceProvider = new MetricsServiceProvider() { @@ -1085,4 +1109,10 @@ public class StackDefinedPropertyProviderTest { Assert.assertEquals(32, metricsArray.length); } + /* Since streamProviders are not injected this hack becomes necessary */ + private void injectCacheEntryFactoryWithStreamProvider(StreamProvider streamProvider) throws Exception { + Field field = TimelineMetricCacheEntryFactory.class.getDeclaredField("requestHelperForGets"); + field.setAccessible(true); + field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider)); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java index c8007c8..71febc9 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java @@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.metrics.timeline; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.api.services.AmbariMetaInfo; import org.apache.ambari.server.configuration.ComponentSSLConfiguration; +import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.internal.PropertyInfo; @@ -27,29 +28,34 @@ import org.apache.ambari.server.controller.internal.ResourceImpl; import org.apache.ambari.server.controller.internal.TemporalInfoImpl; import org.apache.ambari.server.controller.metrics.MetricHostProvider; import org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider; +import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory; +import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider; import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.spi.TemporalInfo; import org.apache.ambari.server.controller.utilities.PropertyHelper; +import org.apache.ambari.server.controller.utilities.StreamProvider; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.Clusters; import org.apache.ambari.server.state.ComponentInfo; import org.apache.ambari.server.state.StackId; import org.apache.http.client.utils.URIBuilder; import org.junit.Assert; -import org.junit.Ignore; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -66,6 +72,7 @@ import static org.mockito.Mockito.mock; @RunWith(PowerMockRunner.class) @PrepareForTest({AMSPropertyProvider.class, AmbariServer.class}) +@PowerMockIgnore({"javax.xml.parsers.*", "org.xml.sax.*", "net.sf.ehcache.*", "org.apache.log4j.*"}) public class AMSPropertyProviderTest { private static final String PROPERTY_ID1 = PropertyHelper.getPropertyId("metrics/cpu", "cpu_user"); private static final String PROPERTY_ID2 = PropertyHelper.getPropertyId("metrics/memory", "mem_free"); @@ -82,10 +89,20 @@ public class AMSPropertyProviderTest { private static final String EMBEDDED_METRICS_FILE_PATH = FILE_PATH_PREFIX + "embedded_host_metric.json"; private static final String AGGREGATE_METRICS_FILE_PATH = FILE_PATH_PREFIX + "aggregate_component_metric.json"; + private static TimelineMetricCacheEntryFactory cacheEntryFactory; + private static TimelineMetricCacheProvider cacheProvider; + + @BeforeClass + public static void setupCache() { + cacheEntryFactory = new TimelineMetricCacheEntryFactory(new Configuration()); + cacheProvider = new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory); + } + @Test public void testPopulateResourcesForSingleHostMetric() throws Exception { setUpCommonMocks(); TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -94,6 +111,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, HOST_NAME_PROPERTY_ID @@ -129,6 +147,7 @@ public class AMSPropertyProviderTest { // given TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); Map<String, Map<String, PropertyInfo>> propertyIds = PropertyHelper.getMetricPropertyIds(Resource.Type.Host); @@ -136,6 +155,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, HOST_NAME_PROPERTY_ID @@ -169,6 +189,7 @@ public class AMSPropertyProviderTest { public void testPopulateResourcesForMultipleHostMetricscPointInTime() throws Exception { setUpCommonMocks(); TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -177,6 +198,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, HOST_NAME_PROPERTY_ID @@ -217,6 +239,7 @@ public class AMSPropertyProviderTest { public void testPopulateResourcesForMultipleHostMetrics() throws Exception { setUpCommonMocks(); TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -225,6 +248,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, HOST_NAME_PROPERTY_ID @@ -276,6 +300,7 @@ public class AMSPropertyProviderTest { public void testPopulateResourcesForRegexpMetrics() throws Exception { setUpCommonMocks(); TestStreamProvider streamProvider = new TestStreamProvider(MULTIPLE_COMPONENT_REGEXP_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -291,6 +316,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, COMPONENT_NAME_PROPERTY_ID @@ -327,6 +353,7 @@ public class AMSPropertyProviderTest { public void testPopulateResourcesForSingleComponentMetric() throws Exception { setUpCommonMocks(); TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_COMPONENT_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -337,6 +364,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, COMPONENT_NAME_PROPERTY_ID @@ -394,6 +422,7 @@ public class AMSPropertyProviderTest { PowerMock.replayAll(); TestStreamProvider streamProvider = new TestStreamProvider(EMBEDDED_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -404,6 +433,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, COMPONENT_NAME_PROPERTY_ID @@ -460,6 +490,7 @@ public class AMSPropertyProviderTest { PowerMock.replayAll(); TestStreamProvider streamProvider = new TestStreamProvider(AGGREGATE_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -471,6 +502,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, COMPONENT_NAME_PROPERTY_ID @@ -504,6 +536,7 @@ public class AMSPropertyProviderTest { public void testFilterOutOfBandMetricData() throws Exception { setUpCommonMocks(); TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -512,6 +545,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, HOST_NAME_PROPERTY_ID @@ -571,6 +605,7 @@ public class AMSPropertyProviderTest { setUpCommonMocks(); TestStreamProviderForHostComponentHostMetricsTest streamProvider = new TestStreamProviderForHostComponentHostMetricsTest(null); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -579,6 +614,7 @@ public class AMSPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID, HOST_NAME_PROPERTY_ID, @@ -696,7 +732,16 @@ public class AMSPropertyProviderTest { expect(ambariMetaInfo.getComponent(anyObject(String.class),anyObject(String.class), anyObject(String.class), anyObject(String.class))) .andReturn(componentInfo).anyTimes(); + replay(ams, clusters, cluster, ambariMetaInfo); PowerMock.replayAll(); } + + /* Since streamProviders are not injected this hack becomes necessary */ + private void injectCacheEntryFactoryWithStreamProvider(StreamProvider streamProvider) throws Exception { + Field field = TimelineMetricCacheEntryFactory.class.getDeclaredField("requestHelperForGets"); + field.setAccessible(true); + field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider)); + } + } http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java index 3ee64fa..99a2102 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java @@ -18,18 +18,24 @@ package org.apache.ambari.server.controller.metrics.timeline; import org.apache.ambari.server.configuration.ComponentSSLConfiguration; +import org.apache.ambari.server.configuration.Configuration; import org.apache.ambari.server.controller.internal.PropertyInfo; import org.apache.ambari.server.controller.internal.ResourceImpl; import org.apache.ambari.server.controller.internal.TemporalInfoImpl; import org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider; +import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory; +import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider; import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.TemporalInfo; import org.apache.ambari.server.controller.utilities.PropertyHelper; +import org.apache.ambari.server.controller.utilities.StreamProvider; import org.apache.http.client.utils.URIBuilder; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import java.io.File; +import java.lang.reflect.Field; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -44,9 +50,19 @@ public class AMSReportPropertyProviderTest { private static final String SINGLE_HOST_METRICS_FILE_PATH = FILE_PATH_PREFIX + "single_host_metric.json"; private static final String AGGREGATE_CLUSTER_METRICS_FILE_PATH = FILE_PATH_PREFIX + "aggregate_cluster_metrics.json"; + private static TimelineMetricCacheEntryFactory cacheEntryFactory; + private static TimelineMetricCacheProvider cacheProvider; + + @BeforeClass + public static void setupCache() { + cacheEntryFactory = new TimelineMetricCacheEntryFactory(new Configuration()); + cacheProvider = new TimelineMetricCacheProvider(new Configuration(), cacheEntryFactory); + } + @Test public void testPopulateResources() throws Exception { TestStreamProvider streamProvider = new TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -58,6 +74,7 @@ public class AMSReportPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID ); @@ -88,6 +105,7 @@ public class AMSReportPropertyProviderTest { @Test public void testPopulateResourceWithAggregateFunction() throws Exception { TestStreamProvider streamProvider = new TestStreamProvider(AGGREGATE_CLUSTER_METRICS_FILE_PATH); + injectCacheEntryFactoryWithStreamProvider(streamProvider); TestMetricHostProvider metricHostProvider = new TestMetricHostProvider(); ComponentSSLConfiguration sslConfiguration = mock(ComponentSSLConfiguration.class); @@ -99,6 +117,7 @@ public class AMSReportPropertyProviderTest { propertyIds, streamProvider, sslConfiguration, + cacheProvider, metricHostProvider, CLUSTER_NAME_PROPERTY_ID ); @@ -125,4 +144,11 @@ public class AMSReportPropertyProviderTest { Number[][] val = (Number[][]) res.getPropertyValue("metrics/cpu/User._sum"); Assert.assertEquals(90, val.length); } + + /* Since streamProviders are not injected this hack becomes necessary */ + private void injectCacheEntryFactoryWithStreamProvider(StreamProvider streamProvider) throws Exception { + Field field = TimelineMetricCacheEntryFactory.class.getDeclaredField("requestHelperForGets"); + field.setAccessible(true); + field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider)); + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java new file mode 100644 index 0000000..b16024b --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java @@ -0,0 +1,365 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.controller.metrics.timeline.cache; + +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import junit.framework.Assert; +import net.sf.ehcache.Cache; +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.constructs.blocking.UpdatingCacheEntryFactory; +import net.sf.ehcache.constructs.blocking.UpdatingSelfPopulatingCache; +import org.apache.ambari.server.configuration.Configuration; +import org.apache.ambari.server.controller.internal.TemporalInfoImpl; +import org.apache.ambari.server.controller.spi.TemporalInfo; +import org.apache.ambari.server.state.stack.OsFamily; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.easymock.IAnswer; +import org.junit.After; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider.TIMELINE_METRIC_CACHE_INSTANCE_NAME; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createMock; +import static org.easymock.EasyMock.createMockBuilder; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.getCurrentArguments; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +public class TimelineMetricCacheTest { + + private TimelineMetricCacheProvider getMetricCacheProvider( + final Configuration configuration, + final TimelineMetricCacheEntryFactory cacheEntryFactory) { + + Injector injector = Guice.createInjector(new Module() { + @Override + public void configure(Binder binder) { + binder.bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class)); + binder.bind(Configuration.class).toInstance(configuration); + binder.bind(TimelineMetricCacheEntryFactory.class).toInstance(cacheEntryFactory); + } + }); + return injector.getInstance(TimelineMetricCacheProvider.class); + } + + @After + public void removeCacheInstance() { + // Avoids Object Exists Exception on unit tests by adding a new cache for + // every provider. + CacheManager manager = CacheManager.getInstance(); + manager.removeCache(TIMELINE_METRIC_CACHE_INSTANCE_NAME); + } + + // General cache behavior demonstration + @Test + public void testSelfPopulatingCacheUpdates() throws Exception { + UpdatingCacheEntryFactory cacheEntryFactory = createMock(UpdatingCacheEntryFactory.class); + + StringBuilder value = new StringBuilder("b"); + + expect(cacheEntryFactory.createEntry("a")).andReturn(value); + cacheEntryFactory.updateEntryValue("a", value); + expectLastCall().andAnswer(new IAnswer<Object>() { + @Override + public Object answer() throws Throwable { + String key = (String) getCurrentArguments()[0]; + StringBuilder value = (StringBuilder) getCurrentArguments()[1]; + System.out.println("key = " + key + ", value = " + value); + value.append("c"); + return null; + } + }); + + replay(cacheEntryFactory); + + CacheManager manager = CacheManager.getInstance(); + Cache cache = new Cache("test", 10, false, false, 10000, 10000); + UpdatingSelfPopulatingCache testCache = new UpdatingSelfPopulatingCache(cache, cacheEntryFactory); + manager.addCache(testCache); + + Assert.assertEquals("b", testCache.get("a").getObjectValue().toString()); + Assert.assertEquals("bc", testCache.get("a").getObjectValue().toString()); + + verify(cacheEntryFactory); + } + + @Test + public void testTimlineMetricCacheProviderGets() throws Exception { + Configuration configuration = createNiceMock(Configuration.class); + expect(configuration.getMetricCacheMaxEntries()).andReturn(1000); + expect(configuration.getMetricCacheTTLSeconds()).andReturn(3600); + expect(configuration.getMetricCacheIdleSeconds()).andReturn(100); + + final long now = System.currentTimeMillis(); + Map<String, TimelineMetric> valueMap = new HashMap<String, TimelineMetric>(); + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName("cpu_user"); + timelineMetric.setAppId("app1"); + Map<Long, Double> metricValues = new HashMap<Long, Double>(); + metricValues.put(now + 100, 1.0); + metricValues.put(now + 200, 2.0); + metricValues.put(now + 300, 3.0); + timelineMetric.setMetricValues(metricValues); + valueMap.put("cpu_user", timelineMetric); + + TimelineMetricCacheEntryFactory cacheEntryFactory = createMock(TimelineMetricCacheEntryFactory.class); + + TimelineAppMetricCacheKey queryKey = new TimelineAppMetricCacheKey( + Collections.singleton("cpu_user"), + "app1", + new TemporalInfoImpl(now, now + 1000, 1) + ); + TimelineMetricsCacheValue value = new TimelineMetricsCacheValue(now, now + 1000, valueMap); + TimelineAppMetricCacheKey testKey = new TimelineAppMetricCacheKey( + Collections.singleton("cpu_user"), + "app1", + new TemporalInfoImpl(now, now + 2000, 1) + ); + + expect(cacheEntryFactory.createEntry(anyObject())).andReturn(value); + cacheEntryFactory.updateEntryValue(testKey, value); + expectLastCall().once(); + + replay(configuration, cacheEntryFactory); + + TimelineMetricCacheProvider cacheProvider = getMetricCacheProvider(configuration, cacheEntryFactory); + TimelineMetricCache cache = cacheProvider.getTimelineMetricsCache(); + + // call to get + TimelineMetrics metrics = cache.getAppTimelineMetricsFromCache(queryKey); + List<TimelineMetric> metricsList = metrics.getMetrics(); + Assert.assertEquals(1, metricsList.size()); + TimelineMetric metric = metricsList.iterator().next(); + Assert.assertEquals("cpu_user", metric.getMetricName()); + Assert.assertEquals("app1", metric.getAppId()); + Assert.assertSame(metricValues, metric.getMetricValues()); + + // call to update with new key + metrics = cache.getAppTimelineMetricsFromCache(testKey); + metricsList = metrics.getMetrics(); + Assert.assertEquals(1, metricsList.size()); + Assert.assertEquals("cpu_user", metric.getMetricName()); + Assert.assertEquals("app1", metric.getAppId()); + Assert.assertSame(metricValues, metric.getMetricValues()); + + verify(configuration, cacheEntryFactory); + } + + @Test + @SuppressWarnings("all") + public void testCacheUpdateBoundsOnVariousRequestScenarios() throws Exception { + Configuration configuration = createNiceMock(Configuration.class); + expect(configuration.getMetricsRequestConnectTimeoutMillis()).andReturn(10000); + expect(configuration.getMetricsRequestReadTimeoutMillis()).andReturn(10000); + expect(configuration.getMetricsRequestIntervalReadTimeoutMillis()).andReturn(10000); + // Disable buffer fudge factor + expect(configuration.getMetricRequestBufferTimeCatchupInterval()).andReturn(0l); + + replay(configuration); + + TimelineMetricCacheEntryFactory factory = + createMockBuilder(TimelineMetricCacheEntryFactory.class) + .withConstructor(configuration).createMock(); + + replay(factory); + + long now = System.currentTimeMillis(); + final long existingSeriesStartTime = now - (3600 * 1000); // now - 1 hour + final long existingSeriesEndTime = now; + + // Regular timeseries overlap + long requestedStartTime = existingSeriesStartTime + 60000; // + 1 min + long requestedEndTime = existingSeriesEndTime + 60000; // + 1 min + + long newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime, + existingSeriesEndTime, requestedStartTime); + + long newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime, + existingSeriesEndTime, requestedEndTime); + + Assert.assertEquals(existingSeriesEndTime, newStartTime); + Assert.assertEquals(requestedEndTime, newEndTime); + + // Disconnected timeseries graph + requestedStartTime = existingSeriesEndTime + 60000; // end + 1 min + requestedEndTime = existingSeriesEndTime + 60000 + 3600000; // + 1 min + 1 hour + + newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime, + existingSeriesEndTime, requestedStartTime); + + newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime, + existingSeriesEndTime, requestedEndTime); + + Assert.assertEquals(requestedStartTime, newStartTime); + Assert.assertEquals(requestedEndTime, newEndTime); + + // Complete overlap + requestedStartTime = existingSeriesStartTime - 60000; // - 1 min + requestedEndTime = existingSeriesEndTime + 60000; // + 1 min + + newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime, + existingSeriesEndTime, requestedStartTime); + + newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime, + existingSeriesEndTime, requestedEndTime); + + Assert.assertEquals(requestedStartTime, newStartTime); + Assert.assertEquals(requestedEndTime, newEndTime); + + // Timeseries in the past + requestedStartTime = existingSeriesStartTime - 3600000 - 60000; // - 1 hour - 1 min + requestedEndTime = existingSeriesStartTime - 60000; // start - 1 min + + newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime, + existingSeriesEndTime, requestedStartTime); + + newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime, + existingSeriesEndTime, requestedEndTime); + + Assert.assertEquals(requestedStartTime, newStartTime); + Assert.assertEquals(requestedEndTime, newEndTime); + + // Timeseries overlap - no new request needed + requestedStartTime = existingSeriesStartTime + 60000; // + 1 min + requestedEndTime = existingSeriesEndTime - 60000; // - 1 min + + newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime, + existingSeriesEndTime, requestedStartTime); + + newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime, + existingSeriesEndTime, requestedEndTime); + + Assert.assertEquals(newStartTime, existingSeriesEndTime); + Assert.assertEquals(newEndTime, existingSeriesStartTime); + + verify(configuration, factory); + + } + + @Test + public void testTimelineMetricCacheTimeseriesUpdates() throws Exception { + Configuration configuration = createNiceMock(Configuration.class); + expect(configuration.getMetricsRequestConnectTimeoutMillis()).andReturn(10000); + expect(configuration.getMetricsRequestReadTimeoutMillis()).andReturn(10000); + expect(configuration.getMetricsRequestIntervalReadTimeoutMillis()).andReturn(10000); + // Disable buffer fudge factor + expect(configuration.getMetricRequestBufferTimeCatchupInterval()).andReturn(0l); + + replay(configuration); + + TimelineMetricCacheEntryFactory factory = + createMockBuilder(TimelineMetricCacheEntryFactory.class) + .withConstructor(configuration).createMock(); + + replay(factory); + + long now = System.currentTimeMillis(); + + // Existing values + + final TimelineMetric timelineMetric1 = new TimelineMetric(); + timelineMetric1.setMetricName("cpu_user"); + timelineMetric1.setAppId("app1"); + Map<Long, Double> metricValues = new TreeMap<Long, Double>(); + metricValues.put(now - 100, 1.0); + metricValues.put(now - 200, 2.0); + metricValues.put(now - 300, 3.0); + timelineMetric1.setMetricValues(metricValues); + final TimelineMetric timelineMetric2 = new TimelineMetric(); + timelineMetric2.setMetricName("cpu_nice"); + timelineMetric2.setAppId("app1"); + metricValues = new TreeMap<Long, Double>(); + metricValues.put(now + 400, 1.0); + metricValues.put(now + 500, 2.0); + metricValues.put(now + 600, 3.0); + timelineMetric2.setMetricValues(metricValues); + + TimelineMetricsCacheValue existingMetricValue = new TimelineMetricsCacheValue( + now - 1000, now + 1000, + new HashMap<String, TimelineMetric>() {{ + put("cpu_user", timelineMetric1); + put("cpu_nice", timelineMetric2); + }}); + + // New values + TimelineMetrics newMetrics = new TimelineMetrics(); + TimelineMetric timelineMetric3 = new TimelineMetric(); + timelineMetric3.setMetricName("cpu_user"); + timelineMetric3.setAppId("app1"); + metricValues = new TreeMap<Long, Double>(); + metricValues.put(now + 1400, 1.0); + metricValues.put(now + 1500, 2.0); + metricValues.put(now + 1600, 3.0); + timelineMetric3.setMetricValues(metricValues); + newMetrics.getMetrics().add(timelineMetric3); + + factory.updateTimelineMetricsInCache(newMetrics, existingMetricValue, + now, now + 2000); + + Assert.assertEquals(2, existingMetricValue.getTimelineMetrics().size()); + Assert.assertEquals(3, existingMetricValue.getTimelineMetrics().get("cpu_user").getMetricValues().size()); + Assert.assertEquals(3, existingMetricValue.getTimelineMetrics().get("cpu_nice").getMetricValues().size()); + Map<Long, Double> newMetricsMap = existingMetricValue.getTimelineMetrics().get("cpu_user").getMetricValues(); + Iterator<Long> metricKeyIterator = newMetricsMap.keySet().iterator(); + Assert.assertEquals(now + 1400, metricKeyIterator.next().longValue()); + Assert.assertEquals(now + 1500, metricKeyIterator.next().longValue()); + Assert.assertEquals(now + 1600, metricKeyIterator.next().longValue()); + + verify(configuration, factory); + } + + @Test + public void testEqualsOnKeys() { + long now = System.currentTimeMillis(); + TemporalInfo temporalInfo = new TemporalInfoImpl(now - 1000, now, 1); + + TimelineAppMetricCacheKey key1 = new TimelineAppMetricCacheKey( + new HashSet<String>() {{ add("cpu_num._avg"); add("proc_run._avg"); }}, + "HOST", + temporalInfo + ); + + TimelineAppMetricCacheKey key2 = new TimelineAppMetricCacheKey( + new HashSet<String>() {{ add("cpu_num._avg"); }}, + "HOST", + temporalInfo + ); + + Assert.assertFalse(key1.equals(key2)); + Assert.assertFalse(key2.equals(key1)); + + key2.getMetricNames().add("proc_run._avg"); + + Assert.assertTrue(key1.equals(key2)); + } +}