Repository: ambari
Updated Branches:
  refs/heads/trunk 5b38d8766 -> 02fd9a796


http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
new file mode 100644
index 0000000..ca141d4
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCache.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import net.sf.ehcache.CacheException;
+import net.sf.ehcache.Ehcache;
+import net.sf.ehcache.Element;
+import net.sf.ehcache.constructs.blocking.LockTimeoutException;
+import net.sf.ehcache.constructs.blocking.UpdatingCacheEntryFactory;
+import net.sf.ehcache.constructs.blocking.UpdatingSelfPopulatingCache;
+import org.apache.ambari.server.AmbariException;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TimelineMetricCache extends UpdatingSelfPopulatingCache {
+
+  private final static Logger LOG = 
LoggerFactory.getLogger(TimelineMetricCache.class);
+  private static AtomicInteger printCacheStatsCounter = new AtomicInteger(0);
+
+  /**
+   * Creates a SelfPopulatingCache.
+   *
+   * @param cache @Cache
+   * @param factory @CacheEntryFactory
+   */
+  public TimelineMetricCache(Ehcache cache, UpdatingCacheEntryFactory factory) 
throws CacheException {
+    super(cache, factory);
+  }
+
+  /**
+   * Get metrics for an app grouped by the requested @TemporalInfo which is a
+   * part of the @TimelineAppMetricCacheKey
+   * @param key @TimelineAppMetricCacheKey
+   * @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+   */
+  public TimelineMetrics 
getAppTimelineMetricsFromCache(TimelineAppMetricCacheKey key) throws 
IllegalArgumentException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Fetching metrics with key: " + key);
+    }
+
+    // Make sure key is valid
+    validateKey(key);
+
+    Element element = get(key);
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    if (element != null) {
+      TimelineMetricsCacheValue value = (TimelineMetricsCacheValue) 
element.getObjectValue();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Returning value from cache: " + value);
+      }
+      timelineMetrics.setMetrics(new 
ArrayList<TimelineMetric>(value.getTimelineMetrics().values()));
+    }
+
+    if (LOG.isDebugEnabled()) {
+      // Print stats every 100 calls - Note: Supported in debug mode only
+      if (printCacheStatsCounter.getAndIncrement() == 0) {
+        LOG.debug("Metrics cache stats => \n" + this.getStatistics());
+      } else {
+        printCacheStatsCounter.compareAndSet(100, 0);
+      }
+    }
+
+    return timelineMetrics;
+  }
+
+  /**
+   * Set new time bounds on the cache key so that update can use the new
+   * query window. We do this quietly which means regular get/update logic is
+   * not invoked.
+   */
+  @Override
+  public Element get(Object key) throws LockTimeoutException {
+    Element element = this.getQuiet(key);
+    if (element != null) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("key : " + element.getObjectKey());
+        LOG.trace("value : " + element.getObjectValue());
+      }
+
+      // Set new time boundaries on the key
+      TimelineAppMetricCacheKey existingKey = (TimelineAppMetricCacheKey) 
element.getObjectKey();
+
+      LOG.debug("Existing temporal info: " + existingKey.getTemporalInfo() +
+        " for : " + existingKey.getMetricNames());
+
+      TimelineAppMetricCacheKey newKey = (TimelineAppMetricCacheKey) key;
+      existingKey.setTemporalInfo(newKey.getTemporalInfo());
+
+      LOG.debug("New temporal info: " + newKey.getTemporalInfo() +
+        " for : " + existingKey.getMetricNames());
+    }
+
+    return super.get(key);
+  }
+
+  private void validateKey(TimelineAppMetricCacheKey key) throws 
IllegalArgumentException {
+    StringBuilder msg = new StringBuilder("Invalid metric key requested.");
+    boolean throwException = false;
+
+    if (key.getTemporalInfo() == null) {
+      msg.append(" No temporal info provided.");
+      throwException = true;
+    }
+
+    if (key.getSpec() == null) {
+      msg.append(" Missing call spec for metric request.");
+    }
+
+    if (throwException) {
+      throw new IllegalArgumentException(msg.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
new file mode 100644
index 0000000..597f037
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheEntryFactory.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import net.sf.ehcache.constructs.blocking.UpdatingCacheEntryFactory;
+import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.internal.URLStreamProvider;
+import 
org.apache.ambari.server.controller.metrics.timeline.MetricsRequestHelper;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.http.client.utils.URIBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+@Singleton
+public class TimelineMetricCacheEntryFactory implements 
UpdatingCacheEntryFactory {
+  private final static Logger LOG = 
LoggerFactory.getLogger(TimelineMetricCacheEntryFactory.class);
+  // Not declared final to ease unit test code and allow streamProvider
+  // injection
+  private MetricsRequestHelper requestHelperForGets;
+  private MetricsRequestHelper requestHelperForUpdates;
+  private final Long BUFFER_TIME_DIFF_CATCHUP_INTERVAL;
+
+  @Inject
+  public TimelineMetricCacheEntryFactory(Configuration configuration) {
+    // Longer timeout for first cache miss
+    requestHelperForGets = new MetricsRequestHelper(new URLStreamProvider(
+      configuration.getMetricsRequestConnectTimeoutMillis(),
+      configuration.getMetricsRequestReadTimeoutMillis(),
+      ComponentSSLConfiguration.instance()));
+
+    // Timeout setting different from first request timeout
+    // Allows stale data to be returned at the behest of performance.
+    requestHelperForUpdates = new MetricsRequestHelper(new URLStreamProvider(
+      configuration.getMetricsRequestConnectTimeoutMillis(),
+      configuration.getMetricsRequestIntervalReadTimeoutMillis(),
+      ComponentSSLConfiguration.instance()));
+
+    BUFFER_TIME_DIFF_CATCHUP_INTERVAL = 
configuration.getMetricRequestBufferTimeCatchupInterval();
+  }
+
+  /**
+   * This method is called on a get element from cache call when key is not
+   * found in cache, returns a value for the key to be cached.
+   *
+   * @param key 
@org.apache.ambari.server.controller.metrics.timeline.cache.TimelineAppMetricCacheKey
+   * @return @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+   * @throws Exception
+   */
+  @Override
+  public Object createEntry(Object key) throws Exception {
+    LOG.debug("Creating cache entry since none exists, key = " + key);
+    TimelineAppMetricCacheKey metricCacheKey = (TimelineAppMetricCacheKey) key;
+
+    TimelineMetrics timelineMetrics =
+      requestHelperForGets.fetchTimelineMetrics(metricCacheKey.getSpec());
+
+    TimelineMetricsCacheValue value = null;
+
+    if (timelineMetrics != null && !timelineMetrics.getMetrics().isEmpty()) {
+      Map<String, TimelineMetric> cacheValue =
+        new HashMap<String, 
TimelineMetric>(timelineMetrics.getMetrics().size());
+      for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+        cacheValue.put(metric.getMetricName(), metric);
+      }
+
+      value = new TimelineMetricsCacheValue(
+        metricCacheKey.getTemporalInfo().getStartTime(),
+        metricCacheKey.getTemporalInfo().getEndTime(),
+        cacheValue // Null or empty should prompt a refresh
+      );
+
+      LOG.debug("Created cache entry: " + value);
+    }
+
+    return value;
+  }
+
+  /**
+   * Called on a get call for existing values in the cache,
+   * the necessary locking code is present in the get call and this call
+   * should update the value of the cache entry before returning.
+   *
+   * @param key 
@org.apache.ambari.server.controller.metrics.timeline.cache.TimelineAppMetricCacheKey
+   * @param value @org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics
+   * @throws Exception
+   */
+  @Override
+  public void updateEntryValue(Object key, Object value) throws Exception {
+    TimelineAppMetricCacheKey metricCacheKey = (TimelineAppMetricCacheKey) key;
+    TimelineMetricsCacheValue existingMetrics = (TimelineMetricsCacheValue) 
value;
+
+    LOG.debug("Updating cache entry, key: " + key + ", with value = " + value);
+
+    Long existingSeriesStartTime = existingMetrics.getStartTime();
+    Long existingSeriesEndTime = existingMetrics.getEndTime();
+
+    TemporalInfo newTemporalInfo = metricCacheKey.getTemporalInfo();
+    Long requestedStartTime = newTemporalInfo.getStartTime();
+    Long requestedEndTime = newTemporalInfo.getEndTime();
+
+    // Calculate new start and end times
+    URIBuilder uriBuilder = new URIBuilder(metricCacheKey.getSpec());
+    Long newStartTime = getRefreshRequestStartTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedStartTime);
+    Long newEndTime = getRefreshRequestEndTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedEndTime);
+
+    // Cover complete overlap scenario
+    // time axis: |-------- exSt ----- reqSt ------ exEnd ----- extEnd 
---------|
+    if (newEndTime > newStartTime &&
+       !(newStartTime.equals(existingSeriesStartTime) &&
+       newEndTime.equals(existingSeriesEndTime))) {
+
+      LOG.debug("Existing cached timeseries startTime = " +
+        new Date(getMillisecondsTime(existingSeriesStartTime)) + ", endTime = 
" +
+        new Date(getMillisecondsTime(existingSeriesEndTime)));
+
+      LOG.debug("Requested timeseries startTime = " +
+        new Date(getMillisecondsTime(newStartTime)) + ", endTime = " +
+        new Date(getMillisecondsTime(newEndTime)));
+
+      // Update spec with new start and end time
+      uriBuilder.setParameter("startTime", String.valueOf(newStartTime));
+      uriBuilder.setParameter("endTime", String.valueOf(newEndTime));
+
+      try {
+        TimelineMetrics newTimeSeries = 
requestHelperForUpdates.fetchTimelineMetrics(uriBuilder.toString());
+
+        // Update existing time series with new values
+        updateTimelineMetricsInCache(newTimeSeries, existingMetrics,
+          getMillisecondsTime(requestedStartTime),
+          getMillisecondsTime(requestedEndTime));
+
+        // Replace old boundary values
+        existingMetrics.setStartTime(requestedStartTime);
+        existingMetrics.setEndTime(requestedEndTime);
+
+      } catch (IOException io) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Exception retrieving metrics.", io);
+        }
+      }
+    } else {
+      LOG.debug("Skip updating cache with new startTime = " +
+        new Date(getMillisecondsTime(newStartTime)) +
+        ", new endTime = " + new Date(getMillisecondsTime(newEndTime)));
+    }
+  }
+
+  /**
+   * Update cache with new timeseries data
+   */
+  protected void updateTimelineMetricsInCache(TimelineMetrics newMetrics,
+      TimelineMetricsCacheValue timelineMetricsCacheValue,
+      Long requestedStartTime, Long requestedEndTime) {
+
+    Map<String, TimelineMetric> existingTimelineMetricMap = 
timelineMetricsCacheValue.getTimelineMetrics();
+
+    // NOTE: Metrics names so far are unique, the Map optimization avoids
+    // multiple iterations of the List
+    for (TimelineMetric timelineMetric : newMetrics.getMetrics()) {
+      if (LOG.isTraceEnabled()) {
+        TreeMap<Long, Double> sortedMetrics = new TreeMap<Long, 
Double>(timelineMetric.getMetricValues());
+
+        LOG.trace("New metric: " + timelineMetric.getMetricName() +
+          " # " + timelineMetric.getMetricValues().size() + ", startTime = " +
+          sortedMetrics.firstKey() + ", endTime = " + sortedMetrics.lastKey());
+      }
+
+
+      TimelineMetric existingMetric = 
existingTimelineMetricMap.get(timelineMetric.getMetricName());
+
+      if (existingMetric != null) {
+        Map<Long, Double> existingMetricValues = 
existingMetric.getMetricValues();
+        LOG.trace("Existing metric: " + timelineMetric.getMetricName() +
+          " # " + existingMetricValues.size());
+
+        Iterator<Map.Entry<Long, Double>> valueIterator = 
existingMetricValues.entrySet().iterator();
+
+        // Remove old values
+        // Assumption: All return value are millis
+        while (valueIterator.hasNext()) {
+          Map.Entry<Long, Double> metricEntry = valueIterator.next();
+          if (metricEntry.getKey() < requestedStartTime
+              || metricEntry.getKey() > requestedEndTime) {
+            valueIterator.remove();
+          }
+        }
+
+        // Add new ones
+        existingMetricValues.putAll(timelineMetric.getMetricValues());
+
+        if (LOG.isTraceEnabled()) {
+          TreeMap<Long, Double> sortedMetrics = new TreeMap<Long, 
Double>(existingMetricValues);
+          LOG.trace("Merged metric: " + timelineMetric.getMetricName() + ", " +
+            "Final size: " + existingMetricValues.size() + ", startTime = " +
+            sortedMetrics.firstKey() + ", endTime = " + 
sortedMetrics.lastKey());
+        }
+      } else {
+        existingTimelineMetricMap.put(timelineMetric.getMetricName(), 
timelineMetric);
+      }
+    }
+  }
+
+  // Scenario: Regular graph updates
+  // time axis: |-------- exSt ----- reqSt ------ exEnd ----- reqEnd ---------|
+  // Scenario: Selective graph updates
+  // time axis: |-------- exSt ----- exEnd ------ reqSt ----- reqEnd ---------|
+  // Scenario: Extended time window
+  // time axis: |-------- reSt ----- exSt ------- extEnd ---- reqEnd ---------|
+  protected Long getRefreshRequestStartTime(Long existingSeriesStartTime,
+      Long existingSeriesEndTime, Long requestedStartTime) {
+    Long diff = requestedStartTime - existingSeriesEndTime;
+    Long startTime = requestedStartTime;
+
+    if (diff < 0 && requestedStartTime > existingSeriesStartTime) {
+      // Regular graph updates
+      // Overlapping timeseries data refresh only new part
+      // Account for missing data on the trailing edge due to buffering
+      startTime = getTimeShiftedStartTime(existingSeriesEndTime);
+    }
+
+    LOG.trace("Requesting timeseries data with new startTime = " +
+      new Date(getMillisecondsTime(startTime)));
+
+    return startTime;
+  }
+
+  // Scenario: Regular graph updates
+  // time axis: |-------- exSt ----- reqSt ------ exEnd ----- reqEnd ---------|
+  // Scenario: Old data request /w overlap
+  // time axis: |-------- reqSt ----- exSt ------ reqEnd ----- extEnd --------|
+  // Scenario: Very Old data request /wo overlap
+  // time axis: |-------- reqSt ----- reqEnd ------ exSt ----- extEnd --------|
+  protected Long getRefreshRequestEndTime(Long existingSeriesStartTime,
+      Long existingSeriesEndTime, Long requestedEndTime) {
+    Long endTime = requestedEndTime;
+    Long diff = requestedEndTime - existingSeriesEndTime;
+    if (diff < 0 && requestedEndTime > existingSeriesStartTime) {
+      // End time overlaps existing timeseries
+      // Get only older data that might not be in the cache
+      endTime = existingSeriesStartTime;
+    }
+
+    LOG.trace("Requesting timeseries data with new endTime = " +
+      new Date(getMillisecondsTime(endTime)));
+    return endTime;
+  }
+
+  /**
+   * Time shift by a constant taking into account Epoch vs millis
+   */
+  private long getTimeShiftedStartTime(long startTime) {
+    if (startTime < 9999999999l) {
+      // Epoch time
+      return startTime - (BUFFER_TIME_DIFF_CATCHUP_INTERVAL / 1000);
+    } else {
+      return startTime - BUFFER_TIME_DIFF_CATCHUP_INTERVAL;
+    }
+  }
+
+  private long getMillisecondsTime(long time) {
+    if (time < 9999999999l) {
+      return time * 1000;
+    } else {
+      return time;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java
new file mode 100644
index 0000000..8df957e
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheProvider.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.config.CacheConfiguration;
+import net.sf.ehcache.store.MemoryStoreEvictionPolicy;
+import org.apache.ambari.server.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Cache implementation that provides ability to perform incremental reads
+ * from Metrics backend and reduce the amount of calls between Ambari and the
+ * Metrics backend.
+ */
+@Singleton
+public class TimelineMetricCacheProvider {
+  private TimelineMetricCache timelineMetricsCache;
+  private volatile boolean isCacheInitialized = false;
+  public static final String TIMELINE_METRIC_CACHE_INSTANCE_NAME = 
"timelineMetricCache";
+
+  Configuration configuration;
+  TimelineMetricCacheEntryFactory cacheEntryFactory;
+
+  private final static Logger LOG = 
LoggerFactory.getLogger(TimelineMetricCacheProvider.class);
+
+  @Inject
+  public TimelineMetricCacheProvider(Configuration configuration,
+                                     TimelineMetricCacheEntryFactory 
cacheEntryFactory) {
+    this.configuration = configuration;
+    this.cacheEntryFactory = cacheEntryFactory;
+  }
+
+  private synchronized void initializeCache() {
+    // Check in case of contention to avoid ObjectExistsException
+    if (isCacheInitialized) {
+      return;
+    }
+
+    //Create a singleton CacheManager using defaults
+    System.setProperty("net.sf.ehcache.skipUpdateCheck", "true");
+    CacheManager manager = CacheManager.getInstance();
+
+    LOG.info("Creating Metrics Cache with timeouts => ttl = " +
+      configuration.getMetricCacheTTLSeconds() + ", idle = " +
+      configuration.getMetricCacheIdleSeconds());
+
+    //Create a Cache specifying its configuration.
+    Cache cache = new Cache(
+      new CacheConfiguration(TIMELINE_METRIC_CACHE_INSTANCE_NAME, 
configuration.getMetricCacheMaxEntries())
+        .timeToLiveSeconds(configuration.getMetricCacheTTLSeconds()) // 1 hour
+        .timeToIdleSeconds(configuration.getMetricCacheIdleSeconds()) // 5 
minutes
+        .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU)
+        .eternal(false)
+        .diskPersistent(false)
+        .overflowToDisk(false)
+        .statistics(LOG.isDebugEnabled() || LOG.isTraceEnabled())
+    );
+
+    timelineMetricsCache = new TimelineMetricCache(cache, cacheEntryFactory);
+
+    LOG.info("Registering metrics cache with provider: name = " +
+      cache.getName() + ", guid: " + cache.getGuid());
+
+    manager.addCache(timelineMetricsCache);
+
+    isCacheInitialized = true;
+  }
+
+  /**
+   * Return an instance of a Ehcache
+   * @return @TimelineMetricCache or null if caching is disabled through 
config.
+   */
+  public TimelineMetricCache getTimelineMetricsCache() {
+    if (configuration.isMetricsCacheDisabled()) {
+      return null;
+    }
+
+    if (!isCacheInitialized) {
+      initializeCache();
+    }
+    return timelineMetricsCache;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java
 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java
new file mode 100644
index 0000000..f9f1f54
--- /dev/null
+++ 
b/ambari-server/src/main/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricsCacheValue.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Wrapper object for metrics returned from AMS that includes the query time
+ * window.
+ */
+public class TimelineMetricsCacheValue {
+  private Long startTime;
+  private Long endTime;
+  private Map<String, TimelineMetric> timelineMetrics;
+
+  public TimelineMetricsCacheValue(Long startTime, Long endTime, Map<String, 
TimelineMetric> timelineMetrics) {
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.timelineMetrics = timelineMetrics;
+  }
+
+  public Map<String, TimelineMetric> getTimelineMetrics() {
+    return timelineMetrics;
+  }
+
+  /**
+   * Map of metricName to metric values. Works on the assumption that metric
+   * name is unique
+   */
+  public void setTimelineMetrics(Map<String, TimelineMetric> timelineMetrics) {
+    this.timelineMetrics = timelineMetrics;
+  }
+
+  public Long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(Long startTime) {
+    this.startTime = startTime;
+  }
+
+  public Long getEndTime() {
+    return endTime;
+  }
+
+  public void setEndTime(Long endTime) {
+    this.endTime = endTime;
+  }
+
+  private long getMillisecondsTime(long time) {
+    if (time < 9999999999l) {
+      return time * 1000;
+    } else {
+      return time;
+    }
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("TimelineMetricsCacheValue {" +
+      "metricNames = " + timelineMetrics.keySet() +
+      ", startTime = " + new Date(getMillisecondsTime(startTime)) +
+      ", endTime = " + new Date(getMillisecondsTime(endTime)) +
+      ", timelineMetrics =");
+
+    for (TimelineMetric metric : timelineMetrics.values()) {
+      sb.append(" { ");
+      sb.append(metric.getMetricName());
+      sb.append(" # ");
+      sb.append(metric.getMetricValues().size());
+      sb.append(" }");
+    }
+    sb.append("}");
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
index b8e0596..3ba79ca 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/StackDefinedPropertyProviderTest.java
@@ -19,6 +19,7 @@ package org.apache.ambari.server.controller.internal;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -27,11 +28,18 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.jmx.TestStreamProvider;
 import org.apache.ambari.server.controller.metrics.JMXPropertyProviderTest;
 import org.apache.ambari.server.controller.metrics.MetricsServiceProvider;
 import 
org.apache.ambari.server.controller.metrics.ganglia.GangliaPropertyProviderTest.TestGangliaHostProvider;
 import 
org.apache.ambari.server.controller.metrics.ganglia.GangliaPropertyProviderTest.TestGangliaServiceProvider;
+import 
org.apache.ambari.server.controller.metrics.timeline.MetricsRequestHelper;
+import 
org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory;
+import 
org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
 import org.apache.ambari.server.controller.spi.Predicate;
 import org.apache.ambari.server.controller.spi.PropertyProvider;
 import org.apache.ambari.server.controller.spi.Request;
@@ -52,6 +60,7 @@ import org.apache.ambari.server.state.stack.Metric;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.inject.Guice;
@@ -70,11 +79,29 @@ public class StackDefinedPropertyProviderTest {
   private Injector injector = null;
   private OrmTestHelper helper = null;
 
+  private static TimelineMetricCacheEntryFactory cacheEntryFactory;
+  private static TimelineMetricCacheProvider cacheProvider;
+
+  @BeforeClass
+  public static void setupCache() {
+    cacheEntryFactory = new TimelineMetricCacheEntryFactory(new 
Configuration());
+    cacheProvider = new TimelineMetricCacheProvider(new Configuration(), 
cacheEntryFactory);
+  }
+
+  public class TestModuleWithCacheProvider implements Module {
+    @Override
+    public void configure(Binder binder) {
+      binder.bind(TimelineMetricCacheProvider.class).toInstance(cacheProvider);
+    }
+  }
+
   @Before
   public void setup() throws Exception {
     InMemoryDefaultTestModule module = new InMemoryDefaultTestModule();
-
-    injector = Guice.createInjector(module);
+    // Use the same cache provider to ensure there is only once instance of
+    // Cache available. The @net.sf.ehcache.CacheManager is a singleton and
+    // does not allow multiple instance with same cache name to be registered.
+    injector = Guice.createInjector(Modules.override(module).with(new 
TestModuleWithCacheProvider()));
     injector.getInstance(GuiceJpaInitializer.class);
     StackDefinedPropertyProvider.init(injector);
 
@@ -404,10 +431,10 @@ public class StackDefinedPropertyProviderTest {
     Assert.assertEquals(12,   
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default",
 "AggregateContainersReleased")));
     Assert.assertEquals(8192, 
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default",
 "AvailableMB")));
     Assert.assertEquals(1,    
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default",
 "AvailableVCores")));
-    Assert.assertEquals(47,   
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default",
 "AppsSubmitted")));
+    Assert.assertEquals(47, 
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/default",
 "AppsSubmitted")));
 
-    Assert.assertEquals(4,    
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue",
 "AggregateContainersAllocated")));
-    Assert.assertEquals(4,    
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue",
 "AggregateContainersReleased")));
+    Assert.assertEquals(4, 
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue",
 "AggregateContainersAllocated")));
+    Assert.assertEquals(4, 
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue",
 "AggregateContainersReleased")));
     Assert.assertEquals(6048, 
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue",
 "AvailableMB")));
     Assert.assertEquals(1,    
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue",
 "AvailableVCores")));
     Assert.assertEquals(1,    
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/yarn/Queue/root/second_queue",
 "AppsSubmitted")));
@@ -688,8 +715,6 @@ public class StackDefinedPropertyProviderTest {
     Assert.assertEquals(8444, 
resource.getPropertyValue(PropertyHelper.getPropertyId("metrics/dfs/journalnode/cluster/mycluster",
 "lastWrittenTxId")));
   }
 
-
-
   @Test
   public void testPopulateResources_jmx_Storm() throws Exception {
     // Adjust stack version for cluster
@@ -818,7 +843,6 @@ public class StackDefinedPropertyProviderTest {
     
Assert.assertTrue(map.get("metrics/hbase/master").containsKey("IsActiveMaster"));
   }
 
-
   @Test
   public void testPopulateResources_params_category5() throws Exception {
     org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider 
streamProvider =
@@ -1042,7 +1066,7 @@ public class StackDefinedPropertyProviderTest {
 
     org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider 
streamProvider =
       new 
org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider("ams/aggregate_component_metric.json");
-
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     JMXPropertyProviderTest.TestJMXHostProvider jmxHostProvider = new 
JMXPropertyProviderTest.TestJMXHostProvider(true);
     TestGangliaHostProvider hostProvider = new TestGangliaHostProvider();
     MetricsServiceProvider serviceProvider = new MetricsServiceProvider() {
@@ -1085,4 +1109,10 @@ public class StackDefinedPropertyProviderTest {
     Assert.assertEquals(32, metricsArray.length);
   }
 
+  /* Since streamProviders are not injected this hack becomes necessary */
+  private void injectCacheEntryFactoryWithStreamProvider(StreamProvider 
streamProvider) throws Exception {
+    Field field = 
TimelineMetricCacheEntryFactory.class.getDeclaredField("requestHelperForGets");
+    field.setAccessible(true);
+    field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider));
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
index c8007c8..71febc9 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSPropertyProviderTest.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.metrics.timeline;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.AmbariServer;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
@@ -27,29 +28,34 @@ import 
org.apache.ambari.server.controller.internal.ResourceImpl;
 import org.apache.ambari.server.controller.internal.TemporalInfoImpl;
 import org.apache.ambari.server.controller.metrics.MetricHostProvider;
 import org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider;
+import 
org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory;
+import 
org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
 import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ComponentInfo;
 import org.apache.ambari.server.state.StackId;
 import org.apache.http.client.utils.URIBuilder;
 import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -66,6 +72,7 @@ import static org.mockito.Mockito.mock;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({AMSPropertyProvider.class, AmbariServer.class})
+@PowerMockIgnore({"javax.xml.parsers.*", "org.xml.sax.*", "net.sf.ehcache.*", 
"org.apache.log4j.*"})
 public class AMSPropertyProviderTest {
   private static final String PROPERTY_ID1 = 
PropertyHelper.getPropertyId("metrics/cpu", "cpu_user");
   private static final String PROPERTY_ID2 = 
PropertyHelper.getPropertyId("metrics/memory", "mem_free");
@@ -82,10 +89,20 @@ public class AMSPropertyProviderTest {
   private static final String EMBEDDED_METRICS_FILE_PATH = FILE_PATH_PREFIX + 
"embedded_host_metric.json";
   private static final String AGGREGATE_METRICS_FILE_PATH = FILE_PATH_PREFIX + 
"aggregate_component_metric.json";
 
+  private static TimelineMetricCacheEntryFactory cacheEntryFactory;
+  private static TimelineMetricCacheProvider cacheProvider;
+
+  @BeforeClass
+  public static void setupCache() {
+    cacheEntryFactory = new TimelineMetricCacheEntryFactory(new 
Configuration());
+    cacheProvider = new TimelineMetricCacheProvider(new Configuration(), 
cacheEntryFactory);
+  }
+
   @Test
   public void testPopulateResourcesForSingleHostMetric() throws Exception {
     setUpCommonMocks();
     TestStreamProvider streamProvider = new 
TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -94,6 +111,7 @@ public class AMSPropertyProviderTest {
       propertyIds,
       streamProvider,
       sslConfiguration,
+      cacheProvider,
       metricHostProvider,
       CLUSTER_NAME_PROPERTY_ID,
       HOST_NAME_PROPERTY_ID
@@ -129,6 +147,7 @@ public class AMSPropertyProviderTest {
 
     // given
     TestStreamProvider streamProvider = new 
TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
     Map<String, Map<String, PropertyInfo>> propertyIds = 
PropertyHelper.getMetricPropertyIds(Resource.Type.Host);
@@ -136,6 +155,7 @@ public class AMSPropertyProviderTest {
       propertyIds,
       streamProvider,
       sslConfiguration,
+      cacheProvider,
       metricHostProvider,
       CLUSTER_NAME_PROPERTY_ID,
       HOST_NAME_PROPERTY_ID
@@ -169,6 +189,7 @@ public class AMSPropertyProviderTest {
   public void testPopulateResourcesForMultipleHostMetricscPointInTime() throws 
Exception {
     setUpCommonMocks();
     TestStreamProvider streamProvider = new 
TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -177,6 +198,7 @@ public class AMSPropertyProviderTest {
       propertyIds,
       streamProvider,
       sslConfiguration,
+      cacheProvider,
       metricHostProvider,
       CLUSTER_NAME_PROPERTY_ID,
       HOST_NAME_PROPERTY_ID
@@ -217,6 +239,7 @@ public class AMSPropertyProviderTest {
   public void testPopulateResourcesForMultipleHostMetrics() throws Exception {
     setUpCommonMocks();
     TestStreamProvider streamProvider = new 
TestStreamProvider(MULTIPLE_HOST_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -225,6 +248,7 @@ public class AMSPropertyProviderTest {
       propertyIds,
       streamProvider,
       sslConfiguration,
+      cacheProvider,
       metricHostProvider,
       CLUSTER_NAME_PROPERTY_ID,
       HOST_NAME_PROPERTY_ID
@@ -276,6 +300,7 @@ public class AMSPropertyProviderTest {
   public void testPopulateResourcesForRegexpMetrics() throws Exception {
     setUpCommonMocks();
     TestStreamProvider streamProvider = new 
TestStreamProvider(MULTIPLE_COMPONENT_REGEXP_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -291,6 +316,7 @@ public class AMSPropertyProviderTest {
         propertyIds,
         streamProvider,
         sslConfiguration,
+        cacheProvider,
         metricHostProvider,
         CLUSTER_NAME_PROPERTY_ID,
         COMPONENT_NAME_PROPERTY_ID
@@ -327,6 +353,7 @@ public class AMSPropertyProviderTest {
   public void testPopulateResourcesForSingleComponentMetric() throws Exception 
{
     setUpCommonMocks();
     TestStreamProvider streamProvider = new 
TestStreamProvider(SINGLE_COMPONENT_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -337,6 +364,7 @@ public class AMSPropertyProviderTest {
       propertyIds,
       streamProvider,
       sslConfiguration,
+      cacheProvider,
       metricHostProvider,
       CLUSTER_NAME_PROPERTY_ID,
       COMPONENT_NAME_PROPERTY_ID
@@ -394,6 +422,7 @@ public class AMSPropertyProviderTest {
     PowerMock.replayAll();
 
     TestStreamProvider streamProvider = new 
TestStreamProvider(EMBEDDED_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -404,6 +433,7 @@ public class AMSPropertyProviderTest {
       propertyIds,
       streamProvider,
       sslConfiguration,
+      cacheProvider,
       metricHostProvider,
       CLUSTER_NAME_PROPERTY_ID,
       COMPONENT_NAME_PROPERTY_ID
@@ -460,6 +490,7 @@ public class AMSPropertyProviderTest {
     PowerMock.replayAll();
 
     TestStreamProvider streamProvider = new 
TestStreamProvider(AGGREGATE_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -471,6 +502,7 @@ public class AMSPropertyProviderTest {
       propertyIds,
       streamProvider,
       sslConfiguration,
+      cacheProvider,
       metricHostProvider,
       CLUSTER_NAME_PROPERTY_ID,
       COMPONENT_NAME_PROPERTY_ID
@@ -504,6 +536,7 @@ public class AMSPropertyProviderTest {
   public void testFilterOutOfBandMetricData() throws Exception {
     setUpCommonMocks();
     TestStreamProvider streamProvider = new 
TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -512,6 +545,7 @@ public class AMSPropertyProviderTest {
       propertyIds,
       streamProvider,
       sslConfiguration,
+      cacheProvider,
       metricHostProvider,
       CLUSTER_NAME_PROPERTY_ID,
       HOST_NAME_PROPERTY_ID
@@ -571,6 +605,7 @@ public class AMSPropertyProviderTest {
     setUpCommonMocks();
     TestStreamProviderForHostComponentHostMetricsTest streamProvider =
       new TestStreamProviderForHostComponentHostMetricsTest(null);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -579,6 +614,7 @@ public class AMSPropertyProviderTest {
       propertyIds,
       streamProvider,
       sslConfiguration,
+      cacheProvider,
       metricHostProvider,
       CLUSTER_NAME_PROPERTY_ID,
       HOST_NAME_PROPERTY_ID,
@@ -696,7 +732,16 @@ public class AMSPropertyProviderTest {
     
expect(ambariMetaInfo.getComponent(anyObject(String.class),anyObject(String.class),
             anyObject(String.class), anyObject(String.class)))
             .andReturn(componentInfo).anyTimes();
+
     replay(ams, clusters, cluster, ambariMetaInfo);
     PowerMock.replayAll();
   }
+
+  /* Since streamProviders are not injected this hack becomes necessary */
+  private void injectCacheEntryFactoryWithStreamProvider(StreamProvider 
streamProvider) throws Exception {
+    Field field = 
TimelineMetricCacheEntryFactory.class.getDeclaredField("requestHelperForGets");
+    field.setAccessible(true);
+    field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
index 3ee64fa..99a2102 100644
--- 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/AMSReportPropertyProviderTest.java
@@ -18,18 +18,24 @@
 package org.apache.ambari.server.controller.metrics.timeline;
 
 import org.apache.ambari.server.configuration.ComponentSSLConfiguration;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.internal.PropertyInfo;
 import org.apache.ambari.server.controller.internal.ResourceImpl;
 import org.apache.ambari.server.controller.internal.TemporalInfoImpl;
 import org.apache.ambari.server.controller.metrics.ganglia.TestStreamProvider;
+import 
org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheEntryFactory;
+import 
org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
 import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.TemporalInfo;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.controller.utilities.StreamProvider;
 import org.apache.http.client.utils.URIBuilder;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import java.io.File;
+import java.lang.reflect.Field;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -44,9 +50,19 @@ public class AMSReportPropertyProviderTest {
   private static final String SINGLE_HOST_METRICS_FILE_PATH = FILE_PATH_PREFIX 
+ "single_host_metric.json";
   private static final String AGGREGATE_CLUSTER_METRICS_FILE_PATH = 
FILE_PATH_PREFIX + "aggregate_cluster_metrics.json";
 
+  private static TimelineMetricCacheEntryFactory cacheEntryFactory;
+  private static TimelineMetricCacheProvider cacheProvider;
+
+  @BeforeClass
+  public static void setupCache() {
+    cacheEntryFactory = new TimelineMetricCacheEntryFactory(new 
Configuration());
+    cacheProvider = new TimelineMetricCacheProvider(new Configuration(), 
cacheEntryFactory);
+  }
+
   @Test
   public void testPopulateResources() throws Exception {
     TestStreamProvider streamProvider = new 
TestStreamProvider(SINGLE_HOST_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -58,6 +74,7 @@ public class AMSReportPropertyProviderTest {
         propertyIds,
         streamProvider,
         sslConfiguration,
+        cacheProvider,
         metricHostProvider,
         CLUSTER_NAME_PROPERTY_ID
     );
@@ -88,6 +105,7 @@ public class AMSReportPropertyProviderTest {
   @Test
   public void testPopulateResourceWithAggregateFunction() throws Exception {
     TestStreamProvider streamProvider = new 
TestStreamProvider(AGGREGATE_CLUSTER_METRICS_FILE_PATH);
+    injectCacheEntryFactoryWithStreamProvider(streamProvider);
     TestMetricHostProvider metricHostProvider = new TestMetricHostProvider();
     ComponentSSLConfiguration sslConfiguration = 
mock(ComponentSSLConfiguration.class);
 
@@ -99,6 +117,7 @@ public class AMSReportPropertyProviderTest {
         propertyIds,
         streamProvider,
         sslConfiguration,
+        cacheProvider,
         metricHostProvider,
         CLUSTER_NAME_PROPERTY_ID
     );
@@ -125,4 +144,11 @@ public class AMSReportPropertyProviderTest {
     Number[][] val = (Number[][]) 
res.getPropertyValue("metrics/cpu/User._sum");
     Assert.assertEquals(90, val.length);
   }
+
+  /* Since streamProviders are not injected this hack becomes necessary */
+  private void injectCacheEntryFactoryWithStreamProvider(StreamProvider 
streamProvider) throws Exception {
+    Field field = 
TimelineMetricCacheEntryFactory.class.getDeclaredField("requestHelperForGets");
+    field.setAccessible(true);
+    field.set(cacheEntryFactory, new MetricsRequestHelper(streamProvider));
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/02fd9a79/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java
 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java
new file mode 100644
index 0000000..b16024b
--- /dev/null
+++ 
b/ambari-server/src/test/java/org/apache/ambari/server/controller/metrics/timeline/cache/TimelineMetricCacheTest.java
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.metrics.timeline.cache;
+
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import junit.framework.Assert;
+import net.sf.ehcache.Cache;
+import net.sf.ehcache.CacheManager;
+import net.sf.ehcache.constructs.blocking.UpdatingCacheEntryFactory;
+import net.sf.ehcache.constructs.blocking.UpdatingSelfPopulatingCache;
+import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.controller.internal.TemporalInfoImpl;
+import org.apache.ambari.server.controller.spi.TemporalInfo;
+import org.apache.ambari.server.state.stack.OsFamily;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static 
org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider.TIMELINE_METRIC_CACHE_INSTANCE_NAME;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.getCurrentArguments;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class TimelineMetricCacheTest {
+
+  private TimelineMetricCacheProvider getMetricCacheProvider(
+      final Configuration configuration,
+      final TimelineMetricCacheEntryFactory cacheEntryFactory) {
+
+    Injector injector = Guice.createInjector(new Module() {
+      @Override
+      public void configure(Binder binder) {
+        binder.bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class));
+        binder.bind(Configuration.class).toInstance(configuration);
+        
binder.bind(TimelineMetricCacheEntryFactory.class).toInstance(cacheEntryFactory);
+      }
+    });
+    return injector.getInstance(TimelineMetricCacheProvider.class);
+  }
+
+  @After
+  public void removeCacheInstance() {
+    // Avoids Object Exists Exception on unit tests by adding a new cache for
+    // every provider.
+    CacheManager manager = CacheManager.getInstance();
+    manager.removeCache(TIMELINE_METRIC_CACHE_INSTANCE_NAME);
+  }
+
+  // General cache behavior demonstration
+  @Test
+  public void testSelfPopulatingCacheUpdates() throws Exception {
+    UpdatingCacheEntryFactory cacheEntryFactory = 
createMock(UpdatingCacheEntryFactory.class);
+
+    StringBuilder value = new StringBuilder("b");
+
+    expect(cacheEntryFactory.createEntry("a")).andReturn(value);
+    cacheEntryFactory.updateEntryValue("a", value);
+    expectLastCall().andAnswer(new IAnswer<Object>() {
+      @Override
+      public Object answer() throws Throwable {
+        String key = (String) getCurrentArguments()[0];
+        StringBuilder value = (StringBuilder) getCurrentArguments()[1];
+        System.out.println("key = " + key + ", value = " + value);
+        value.append("c");
+        return null;
+      }
+    });
+
+    replay(cacheEntryFactory);
+
+    CacheManager manager = CacheManager.getInstance();
+    Cache cache = new Cache("test", 10, false, false, 10000, 10000);
+    UpdatingSelfPopulatingCache testCache = new 
UpdatingSelfPopulatingCache(cache, cacheEntryFactory);
+    manager.addCache(testCache);
+
+    Assert.assertEquals("b", testCache.get("a").getObjectValue().toString());
+    Assert.assertEquals("bc", testCache.get("a").getObjectValue().toString());
+
+    verify(cacheEntryFactory);
+  }
+
+  @Test
+  public void testTimlineMetricCacheProviderGets() throws Exception {
+    Configuration configuration = createNiceMock(Configuration.class);
+    expect(configuration.getMetricCacheMaxEntries()).andReturn(1000);
+    expect(configuration.getMetricCacheTTLSeconds()).andReturn(3600);
+    expect(configuration.getMetricCacheIdleSeconds()).andReturn(100);
+
+    final long now = System.currentTimeMillis();
+    Map<String, TimelineMetric> valueMap = new HashMap<String, 
TimelineMetric>();
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName("cpu_user");
+    timelineMetric.setAppId("app1");
+    Map<Long, Double> metricValues = new HashMap<Long, Double>();
+    metricValues.put(now + 100, 1.0);
+    metricValues.put(now + 200, 2.0);
+    metricValues.put(now + 300, 3.0);
+    timelineMetric.setMetricValues(metricValues);
+    valueMap.put("cpu_user", timelineMetric);
+
+    TimelineMetricCacheEntryFactory cacheEntryFactory = 
createMock(TimelineMetricCacheEntryFactory.class);
+
+    TimelineAppMetricCacheKey queryKey = new TimelineAppMetricCacheKey(
+      Collections.singleton("cpu_user"),
+      "app1",
+      new TemporalInfoImpl(now, now + 1000, 1)
+    );
+    TimelineMetricsCacheValue value = new TimelineMetricsCacheValue(now, now + 
1000, valueMap);
+    TimelineAppMetricCacheKey testKey = new TimelineAppMetricCacheKey(
+      Collections.singleton("cpu_user"),
+      "app1",
+      new TemporalInfoImpl(now, now + 2000, 1)
+    );
+
+    expect(cacheEntryFactory.createEntry(anyObject())).andReturn(value);
+    cacheEntryFactory.updateEntryValue(testKey, value);
+    expectLastCall().once();
+
+    replay(configuration, cacheEntryFactory);
+
+    TimelineMetricCacheProvider cacheProvider = 
getMetricCacheProvider(configuration, cacheEntryFactory);
+    TimelineMetricCache cache = cacheProvider.getTimelineMetricsCache();
+
+    // call to get
+    TimelineMetrics metrics = cache.getAppTimelineMetricsFromCache(queryKey);
+    List<TimelineMetric> metricsList = metrics.getMetrics();
+    Assert.assertEquals(1, metricsList.size());
+    TimelineMetric metric = metricsList.iterator().next();
+    Assert.assertEquals("cpu_user", metric.getMetricName());
+    Assert.assertEquals("app1", metric.getAppId());
+    Assert.assertSame(metricValues, metric.getMetricValues());
+
+    // call to update with new key
+    metrics = cache.getAppTimelineMetricsFromCache(testKey);
+    metricsList = metrics.getMetrics();
+    Assert.assertEquals(1, metricsList.size());
+    Assert.assertEquals("cpu_user", metric.getMetricName());
+    Assert.assertEquals("app1", metric.getAppId());
+    Assert.assertSame(metricValues, metric.getMetricValues());
+
+    verify(configuration, cacheEntryFactory);
+  }
+
+  @Test
+  @SuppressWarnings("all")
+  public void testCacheUpdateBoundsOnVariousRequestScenarios() throws 
Exception {
+    Configuration configuration = createNiceMock(Configuration.class);
+    
expect(configuration.getMetricsRequestConnectTimeoutMillis()).andReturn(10000);
+    
expect(configuration.getMetricsRequestReadTimeoutMillis()).andReturn(10000);
+    
expect(configuration.getMetricsRequestIntervalReadTimeoutMillis()).andReturn(10000);
+    // Disable buffer fudge factor
+    
expect(configuration.getMetricRequestBufferTimeCatchupInterval()).andReturn(0l);
+
+    replay(configuration);
+
+    TimelineMetricCacheEntryFactory factory =
+      createMockBuilder(TimelineMetricCacheEntryFactory.class)
+        .withConstructor(configuration).createMock();
+
+    replay(factory);
+
+    long now = System.currentTimeMillis();
+    final long existingSeriesStartTime = now - (3600 * 1000); // now - 1 hour
+    final long existingSeriesEndTime = now;
+
+    // Regular timeseries overlap
+    long requestedStartTime = existingSeriesStartTime + 60000; // + 1 min
+    long requestedEndTime = existingSeriesEndTime + 60000; // + 1 min
+
+    long newStartTime = 
factory.getRefreshRequestStartTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedStartTime);
+
+    long newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedEndTime);
+
+    Assert.assertEquals(existingSeriesEndTime, newStartTime);
+    Assert.assertEquals(requestedEndTime, newEndTime);
+
+    // Disconnected timeseries graph
+    requestedStartTime = existingSeriesEndTime + 60000; // end + 1 min
+    requestedEndTime = existingSeriesEndTime + 60000 + 3600000; // + 1 min + 1 
hour
+
+    newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedStartTime);
+
+    newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedEndTime);
+
+    Assert.assertEquals(requestedStartTime, newStartTime);
+    Assert.assertEquals(requestedEndTime, newEndTime);
+
+    // Complete overlap
+    requestedStartTime = existingSeriesStartTime - 60000; // - 1 min
+    requestedEndTime = existingSeriesEndTime + 60000; // + 1 min
+
+    newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedStartTime);
+
+    newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedEndTime);
+
+    Assert.assertEquals(requestedStartTime, newStartTime);
+    Assert.assertEquals(requestedEndTime, newEndTime);
+
+    // Timeseries in the past
+    requestedStartTime = existingSeriesStartTime - 3600000 - 60000; // - 1 
hour - 1 min
+    requestedEndTime = existingSeriesStartTime - 60000; // start - 1 min
+
+    newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedStartTime);
+
+    newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedEndTime);
+
+    Assert.assertEquals(requestedStartTime, newStartTime);
+    Assert.assertEquals(requestedEndTime, newEndTime);
+
+    // Timeseries overlap - no new request needed
+    requestedStartTime = existingSeriesStartTime + 60000; // + 1 min
+    requestedEndTime = existingSeriesEndTime - 60000; // - 1 min
+
+    newStartTime = factory.getRefreshRequestStartTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedStartTime);
+
+    newEndTime = factory.getRefreshRequestEndTime(existingSeriesStartTime,
+      existingSeriesEndTime, requestedEndTime);
+
+    Assert.assertEquals(newStartTime, existingSeriesEndTime);
+    Assert.assertEquals(newEndTime, existingSeriesStartTime);
+
+    verify(configuration, factory);
+
+  }
+
+  @Test
+  public void testTimelineMetricCacheTimeseriesUpdates() throws Exception {
+    Configuration configuration = createNiceMock(Configuration.class);
+    
expect(configuration.getMetricsRequestConnectTimeoutMillis()).andReturn(10000);
+    
expect(configuration.getMetricsRequestReadTimeoutMillis()).andReturn(10000);
+    
expect(configuration.getMetricsRequestIntervalReadTimeoutMillis()).andReturn(10000);
+    // Disable buffer fudge factor
+    
expect(configuration.getMetricRequestBufferTimeCatchupInterval()).andReturn(0l);
+
+    replay(configuration);
+
+    TimelineMetricCacheEntryFactory factory =
+      createMockBuilder(TimelineMetricCacheEntryFactory.class)
+        .withConstructor(configuration).createMock();
+
+    replay(factory);
+
+    long now = System.currentTimeMillis();
+
+    // Existing values
+
+    final TimelineMetric timelineMetric1 = new TimelineMetric();
+    timelineMetric1.setMetricName("cpu_user");
+    timelineMetric1.setAppId("app1");
+    Map<Long, Double> metricValues = new TreeMap<Long, Double>();
+    metricValues.put(now - 100, 1.0);
+    metricValues.put(now - 200, 2.0);
+    metricValues.put(now - 300, 3.0);
+    timelineMetric1.setMetricValues(metricValues);
+    final TimelineMetric timelineMetric2 = new TimelineMetric();
+    timelineMetric2.setMetricName("cpu_nice");
+    timelineMetric2.setAppId("app1");
+    metricValues = new TreeMap<Long, Double>();
+    metricValues.put(now + 400, 1.0);
+    metricValues.put(now + 500, 2.0);
+    metricValues.put(now + 600, 3.0);
+    timelineMetric2.setMetricValues(metricValues);
+
+    TimelineMetricsCacheValue existingMetricValue = new 
TimelineMetricsCacheValue(
+      now - 1000, now + 1000,
+      new HashMap<String, TimelineMetric>() {{
+        put("cpu_user", timelineMetric1);
+        put("cpu_nice", timelineMetric2);
+      }});
+
+    // New values
+    TimelineMetrics newMetrics = new TimelineMetrics();
+    TimelineMetric timelineMetric3 = new TimelineMetric();
+    timelineMetric3.setMetricName("cpu_user");
+    timelineMetric3.setAppId("app1");
+    metricValues = new TreeMap<Long, Double>();
+    metricValues.put(now + 1400, 1.0);
+    metricValues.put(now + 1500, 2.0);
+    metricValues.put(now + 1600, 3.0);
+    timelineMetric3.setMetricValues(metricValues);
+    newMetrics.getMetrics().add(timelineMetric3);
+
+    factory.updateTimelineMetricsInCache(newMetrics, existingMetricValue,
+      now, now + 2000);
+
+    Assert.assertEquals(2, existingMetricValue.getTimelineMetrics().size());
+    Assert.assertEquals(3, 
existingMetricValue.getTimelineMetrics().get("cpu_user").getMetricValues().size());
+    Assert.assertEquals(3, 
existingMetricValue.getTimelineMetrics().get("cpu_nice").getMetricValues().size());
+    Map<Long, Double> newMetricsMap = 
existingMetricValue.getTimelineMetrics().get("cpu_user").getMetricValues();
+    Iterator<Long> metricKeyIterator = newMetricsMap.keySet().iterator();
+    Assert.assertEquals(now + 1400, metricKeyIterator.next().longValue());
+    Assert.assertEquals(now + 1500, metricKeyIterator.next().longValue());
+    Assert.assertEquals(now + 1600, metricKeyIterator.next().longValue());
+
+    verify(configuration, factory);
+  }
+
+  @Test
+  public void testEqualsOnKeys() {
+    long now = System.currentTimeMillis();
+    TemporalInfo temporalInfo = new TemporalInfoImpl(now - 1000, now, 1);
+
+    TimelineAppMetricCacheKey key1 = new TimelineAppMetricCacheKey(
+      new HashSet<String>() {{ add("cpu_num._avg"); add("proc_run._avg"); }},
+      "HOST",
+      temporalInfo
+    );
+
+    TimelineAppMetricCacheKey key2 = new TimelineAppMetricCacheKey(
+      new HashSet<String>() {{ add("cpu_num._avg"); }},
+      "HOST",
+      temporalInfo
+    );
+
+    Assert.assertFalse(key1.equals(key2));
+    Assert.assertFalse(key2.equals(key1));
+
+    key2.getMetricNames().add("proc_run._avg");
+
+    Assert.assertTrue(key1.equals(key2));
+  }
+}

Reply via email to