http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java new file mode 100644 index 0000000..967d819 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSource.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source; + +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache.InternalMetricsCache; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache.InternalMetricsCacheProvider; + +public class RawMetricsSource implements InternalMetricsSource { + private static final Log LOG = LogFactory.getLog(RawMetricsSource.class); + private final int internalCacheInterval; + private final ExternalMetricsSink rawMetricsSink; + private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + private final InternalMetricsCache cache; + static final String RAW_METRICS_CACHE = "RAW_METRICS_CACHE_INSTANCE"; + + public RawMetricsSource(int internalCacheInterval, ExternalMetricsSink rawMetricsSink) { + this.internalCacheInterval = internalCacheInterval; + this.rawMetricsSink = rawMetricsSink; + this.cache = InternalMetricsCacheProvider.getInstance().getCacheInstance(RAW_METRICS_CACHE); + if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) { + initializeFixedRateScheduler(); + } + } + + @Override + public void publishTimelineMetrics(Collection<TimelineMetrics> metrics) { + // TODO: Adjust default flush to reasonable defaults > 3 seconds + if (rawMetricsSink.getFlushSeconds() > internalCacheInterval) { + // Need to cache only if external sink cannot keep up and thereby has + // different flush interval as compared to HBase flush + cache.putAll(metrics); // Scheduler initialized already for flush + } else { + submitDataWithTimeout(metrics); + } + } + + private void initializeFixedRateScheduler() { + executorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + rawMetricsSink.sinkMetricData(cache.evictAll()); + } + }, rawMetricsSink.getFlushSeconds(), rawMetricsSink.getFlushSeconds(), TimeUnit.SECONDS); + } + + private void submitDataWithTimeout(final Collection<TimelineMetrics> metrics) { + Future f = executorService.submit(new Callable<Object>() { + @Override + public Object call() throws Exception { + rawMetricsSink.sinkMetricData(metrics); + return null; + } + }); + try { + f.get(rawMetricsSink.getSinkTimeOutSeconds(), TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Raw metrics sink interrupted."); + } catch (ExecutionException e) { + LOG.warn("Exception on sinking metrics", e); + } catch (TimeoutException e) { + LOG.warn("Timeout exception on sinking metrics", e); + } + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java new file mode 100644 index 0000000..28d457d --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheKey.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache; + +public class InternalMetricCacheKey { + private String metricName; + private String appId; + private String instanceId; + private String hostname; + private long startTime; // Useful for debugging + + public InternalMetricCacheKey(String metricName, String appId, String instanceId, String hostname, long startTime) { + this.metricName = metricName; + this.appId = appId; + this.instanceId = instanceId; + this.hostname = hostname; + this.startTime = startTime; + } + + public String getMetricName() { + return metricName; + } + + public void setMetricName(String metricName) { + this.metricName = metricName; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public String getInstanceId() { + return instanceId; + } + + public void setInstanceId(String instanceId) { + this.instanceId = instanceId; + } + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public long getStartTime() { + return startTime; + } + + public void setStartTime(long startTime) { + this.startTime = startTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + InternalMetricCacheKey that = (InternalMetricCacheKey) o; + + if (!getMetricName().equals(that.getMetricName())) return false; + if (!getAppId().equals(that.getAppId())) return false; + if (getInstanceId() != null ? !getInstanceId().equals(that.getInstanceId()) : that.getInstanceId() != null) + return false; + return getHostname() != null ? getHostname().equals(that.getHostname()) : that.getHostname() == null; + + } + + @Override + public int hashCode() { + int result = getMetricName().hashCode(); + result = 31 * result + getAppId().hashCode(); + result = 31 * result + (getInstanceId() != null ? getInstanceId().hashCode() : 0); + result = 31 * result + (getHostname() != null ? getHostname().hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "InternalMetricCacheKey{" + + "metricName='" + metricName + '\'' + + ", appId='" + appId + '\'' + + ", instanceId='" + instanceId + '\'' + + ", hostname='" + hostname + '\'' + + ", startTime=" + startTime + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java new file mode 100644 index 0000000..a4dabe7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricCacheValue.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache; + +import java.util.Map; +import java.util.TreeMap; + +public class InternalMetricCacheValue { + private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + + public TreeMap<Long, Double> getMetricValues() { + return metricValues; + } + + public void setMetricValues(TreeMap<Long, Double> metricValues) { + this.metricValues = metricValues; + } + + public void addMetricValues(Map<Long, Double> metricValues) { + this.metricValues.putAll(metricValues); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java new file mode 100644 index 0000000..a4ed9bc --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCache.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; + +import net.sf.ehcache.Cache; +import net.sf.ehcache.CacheException; +import net.sf.ehcache.CacheManager; +import net.sf.ehcache.Ehcache; +import net.sf.ehcache.Element; +import net.sf.ehcache.config.CacheConfiguration; +import net.sf.ehcache.config.PersistenceConfiguration; +import net.sf.ehcache.config.SizeOfPolicyConfiguration; +import net.sf.ehcache.event.CacheEventListener; +import net.sf.ehcache.store.MemoryStoreEvictionPolicy; + +public class InternalMetricsCache { + private static final Log LOG = LogFactory.getLog(InternalMetricsCache.class); + private final String instanceName; + private final String maxHeapPercent; + private volatile boolean isCacheInitialized = false; + private Cache cache; + static final String TIMELINE_METRIC_CACHE_MANAGER_NAME = "internalMetricsCacheManager"; + private final Lock lock = new ReentrantLock(); + private static final int LOCK_TIMEOUT_SECONDS = 2; + + public InternalMetricsCache(String instanceName, String maxHeapPercent) { + this.instanceName = instanceName; + this.maxHeapPercent = maxHeapPercent; + initialize(); + } + + private void initialize() { + // Check in case of contention to avoid ObjectExistsException + if (isCacheInitialized) { + throw new RuntimeException("Cannot initialize internal cache twice"); + } + + System.setProperty("net.sf.ehcache.skipUpdateCheck", "true"); + System.setProperty("net.sf.ehcache.sizeofengine." + TIMELINE_METRIC_CACHE_MANAGER_NAME, + "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache.InternalMetricsCacheSizeOfEngine"); + + net.sf.ehcache.config.Configuration managerConfig = + new net.sf.ehcache.config.Configuration(); + managerConfig.setName(TIMELINE_METRIC_CACHE_MANAGER_NAME); + + // Set max heap available to the cache manager + managerConfig.setMaxBytesLocalHeap(maxHeapPercent); + + //Create a singleton CacheManager using defaults + CacheManager manager = CacheManager.create(managerConfig); + + LOG.info("Creating Metrics Cache with maxHeapPercent => " + maxHeapPercent); + + // Create a Cache specifying its configuration. + CacheConfiguration cacheConfiguration = new CacheConfiguration() + .name(instanceName) + .memoryStoreEvictionPolicy(MemoryStoreEvictionPolicy.LRU) + .sizeOfPolicy(new SizeOfPolicyConfiguration() // Set sizeOf policy to continue on max depth reached - avoid OOM + .maxDepth(10000) + .maxDepthExceededBehavior(SizeOfPolicyConfiguration.MaxDepthExceededBehavior.CONTINUE)) + .eternal(true) // infinite time until eviction + .persistence(new PersistenceConfiguration() + .strategy(PersistenceConfiguration.Strategy.NONE.name())); + + cache = new Cache(cacheConfiguration); + cache.getCacheEventNotificationService().registerListener(new InternalCacheEvictionListener()); + + LOG.info("Registering internal metrics cache with provider: name = " + + cache.getName() + ", guid: " + cache.getGuid()); + + manager.addCache(cache); + + isCacheInitialized = true; + } + + public InternalMetricCacheValue getInternalMetricCacheValue(InternalMetricCacheKey key) { + Element ele = cache.get(key); + if (ele != null) { + return (InternalMetricCacheValue) ele.getObjectValue(); + } + return null; + } + + public Collection<TimelineMetrics> evictAll() { + TimelineMetrics metrics = new TimelineMetrics(); + try { + if (lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + try{ + List keys = cache.getKeys(); + for (Object obj : keys) { + TimelineMetric metric = new TimelineMetric(); + InternalMetricCacheKey key = (InternalMetricCacheKey) obj; + metric.setMetricName(key.getMetricName()); + metric.setAppId(key.getAppId()); + metric.setInstanceId(key.getInstanceId()); + metric.setHostName(key.getHostname()); + metric.setStartTime(key.getStartTime()); + metric.setTimestamp(key.getStartTime()); + Element ele = cache.get(key); + metric.setMetricValues(((InternalMetricCacheValue) ele.getObjectValue()).getMetricValues()); + metrics.getMetrics().add(metric); + } + cache.removeAll(); + } finally { + lock.unlock(); + } + } else { + LOG.warn("evictAll: Unable to acquire lock on the cache instance. " + + "Giving up after " + LOCK_TIMEOUT_SECONDS + " seconds."); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting to acquire lock"); + } + + return Collections.singletonList(metrics); + } + + public void putAll(Collection<TimelineMetrics> metrics) { + try { + if (lock.tryLock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + try { + if (metrics != null) { + for (TimelineMetrics timelineMetrics : metrics) { + for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) { + InternalMetricCacheKey key = new InternalMetricCacheKey( + timelineMetric.getMetricName(), + timelineMetric.getAppId(), + timelineMetric.getInstanceId(), + timelineMetric.getHostName(), + timelineMetric.getStartTime() + ); + + Element ele = cache.get(key); + if (ele != null) { + InternalMetricCacheValue value = (InternalMetricCacheValue) ele.getObjectValue(); + value.addMetricValues(timelineMetric.getMetricValues()); + } else { + InternalMetricCacheValue value = new InternalMetricCacheValue(); + value.setMetricValues(timelineMetric.getMetricValues()); + cache.put(new Element(key, value)); + } + } + } + } + } finally { + lock.unlock(); + } + } else { + LOG.warn("putAll: Unable to acquire lock on the cache instance. " + + "Giving up after " + LOCK_TIMEOUT_SECONDS + " seconds."); + } + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting to acquire lock"); + } + } + + class InternalCacheEvictionListener implements CacheEventListener { + + @Override + public void notifyElementRemoved(Ehcache cache, Element element) throws CacheException { + // expected + } + + @Override + public void notifyElementPut(Ehcache cache, Element element) throws CacheException { + // do nothing + } + + @Override + public void notifyElementUpdated(Ehcache cache, Element element) throws CacheException { + // do nothing + } + + @Override + public void notifyElementExpired(Ehcache cache, Element element) { + // do nothing + } + + @Override + public void notifyElementEvicted(Ehcache cache, Element element) { + // Bad - Remote endpoint cannot keep up resulting in flooding + InternalMetricCacheKey key = (InternalMetricCacheKey) element.getObjectKey(); + LOG.warn("Evicting element from internal metrics cache, metric => " + key + .getMetricName() + ", startTime = " + new Date(key.getStartTime())); + } + + @Override + public void notifyRemoveAll(Ehcache cache) { + // expected + } + + @Override + public Object clone() throws CloneNotSupportedException { + return null; + } + + @Override + public void dispose() { + // do nothing + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java new file mode 100644 index 0000000..3e0dc1b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheProvider.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; + +public class InternalMetricsCacheProvider { + private Map<String, InternalMetricsCache> metricsCacheMap = new ConcurrentHashMap<>(); + private static final InternalMetricsCacheProvider instance = new InternalMetricsCacheProvider(); + + private InternalMetricsCacheProvider() { + } + + public static InternalMetricsCacheProvider getInstance() { + return instance; + } + + public InternalMetricsCache getCacheInstance(String instanceName) { + if (metricsCacheMap.containsKey(instanceName)) { + return metricsCacheMap.get(instanceName); + } else { + TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance(); + InternalMetricsCache cache = new InternalMetricsCache(instanceName, + conf.getInternalCacheHeapPercent(instanceName)); + + metricsCacheMap.put(instanceName, cache); + return cache; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java new file mode 100644 index 0000000..d1a1a89 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/cache/InternalMetricsCacheSizeOfEngine.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.cache; + +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsEhCacheSizeOfEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import net.sf.ehcache.pool.Size; +import net.sf.ehcache.pool.SizeOfEngine; + +public class InternalMetricsCacheSizeOfEngine extends TimelineMetricsEhCacheSizeOfEngine { + private final static Logger LOG = LoggerFactory.getLogger(InternalMetricsCacheSizeOfEngine.class); + + private InternalMetricsCacheSizeOfEngine(SizeOfEngine underlying) { + super(underlying); + } + + public InternalMetricsCacheSizeOfEngine() { + // Invoke default constructor in base class + } + + @Override + public Size sizeOf(Object key, Object value, Object container) { + try { + LOG.debug("BEGIN - Sizeof, key: {}, value: {}", key, value); + long size = 0; + if (key instanceof InternalMetricCacheKey) { + InternalMetricCacheKey metricCacheKey = (InternalMetricCacheKey) key; + size += reflectionSizeOf.sizeOf(metricCacheKey.getMetricName()); + size += reflectionSizeOf.sizeOf(metricCacheKey.getAppId()); + size += reflectionSizeOf.sizeOf(metricCacheKey.getInstanceId()); // null safe + size += reflectionSizeOf.sizeOf(metricCacheKey.getHostname()); + } + if (value instanceof InternalMetricCacheValue) { + size += getValueMapSize(((InternalMetricCacheValue) value).getMetricValues()); + } + // Mark size as not being exact + return new Size(size, false); + } finally { + LOG.debug("END - Sizeof, key: {}", key); + } + } + + @Override + public SizeOfEngine copyWith(int maxDepth, boolean abortWhenMaxDepthExceeded) { + LOG.debug("Copying tracing sizeof engine, maxdepth: {}, abort: {}", + maxDepth, abortWhenMaxDepthExceeded); + + return new InternalMetricsCacheSizeOfEngine(underlying.copyWith(maxDepth, abortWhenMaxDepthExceeded)); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java index 3688630..41ddef5 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java @@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricsService; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController; @@ -73,7 +73,7 @@ import static org.powermock.api.support.membermodification.MemberMatcher.method; import static org.powermock.api.support.membermodification.MemberModifier.suppress; @RunWith(PowerMockRunner.class) -@PrepareForTest({ PhoenixHBaseAccessor.class, HBaseTimelineMetricStore.class, UserGroupInformation.class, +@PrepareForTest({ PhoenixHBaseAccessor.class, HBaseTimelineMetricsService.class, UserGroupInformation.class, ClientCnxn.class, DefaultPhoenixDataSource.class, ConnectionFactory.class, TimelineMetricConfiguration.class, ApplicationHistoryServer.class }) @PowerMockIgnore( {"javax.management.*"}) http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java index 611d82e..fbf7b09 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java @@ -17,10 +17,32 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.assertj.core.api.Assertions.assertThat; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replayAll; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; @@ -41,24 +63,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.io.IOException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.assertj.core.api.Assertions.assertThat; - public abstract class AbstractMiniHBaseClusterTest extends BaseTest { protected static final long BATCH_SIZE = 3; @@ -200,11 +204,8 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest { metricsConf.setLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE, 600000); return - new PhoenixHBaseAccessor( - new Configuration(), - metricsConf, + new PhoenixHBaseAccessor(new TimelineMetricConfiguration(new Configuration(), metricsConf), new PhoenixConnectionProvider() { - @Override public HBaseAdmin getHBaseAdmin() throws IOException { try { @@ -229,7 +230,7 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest { } protected void insertMetricRecords(Connection conn, TimelineMetrics metrics, long currentTime) - throws SQLException, IOException { + throws SQLException, IOException { List<TimelineMetric> timelineMetrics = metrics.getMetrics(); if (timelineMetrics == null || timelineMetrics.isEmpty()) { http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java index aae1d4b..f035678 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java @@ -33,7 +33,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function.PostProcessingFunction.RATE; import static org.assertj.core.api.Assertions.assertThat; -public class HBaseTimelineMetricStoreTest { +public class HBaseTimelineMetricsServiceTest { public static final String MEM_METRIC = "mem"; public static final String BYTES_IN_METRIC = "bytes_in"; @@ -51,7 +51,7 @@ public class HBaseTimelineMetricStoreTest { //when Multimap<String, List<Function>> multimap = - HBaseTimelineMetricStore.parseMetricNamesToAggregationFunctions(metricNames); + HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames); //then Assert.assertEquals(multimap.keySet().size(), 3); @@ -104,7 +104,7 @@ public class HBaseTimelineMetricStoreTest { metricValues.put(1454016728371L, 1011.25); // Calculate rate - Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), false); + Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false); // Make sure rate is zero for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) { @@ -122,7 +122,7 @@ public class HBaseTimelineMetricStoreTest { metricValues.put(1454016548371L, 1010.25); metricValues.put(1454016608371L, 1010.25); - Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), true); + Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true); Assert.assertTrue(rates.size()==4); Assert.assertTrue(rates.containsValue(-1.0)); http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java index d5baaef..f6d69f6 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java @@ -17,9 +17,33 @@ */ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Multimap; -import junit.framework.Assert; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; @@ -40,35 +64,10 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; import org.junit.Test; -import java.io.IOException; -import java.lang.reflect.Array; -import java.lang.reflect.Field; -import java.sql.SQLException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertTrue; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createMetricHostAggregate; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES; +import junit.framework.Assert; @@ -93,7 +92,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { // WHEN long endTime = ctime + minute; Condition condition = new DefaultCondition( - Collections.singletonList("disk_free"), Collections.singletonList("local1"), + new ArrayList<String>() {{ add("disk_free"); }}, + Collections.singletonList("local1"), null, null, startTime, endTime, Precision.SECONDS, null, true); TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition, singletonValueFunctionMap("disk_free")); @@ -117,18 +117,19 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { long ctime = startTime; long minute = 60 * 1000; hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 1)); + "disk_free", 1)); hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + minute, "local1", - "disk_free", 2)); + "disk_free", 2)); hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 2)); + "disk_free", 2)); long endTime = ctime + minute; boolean success = aggregatorMinute.doWork(startTime, endTime); assertTrue(success); // WHEN Condition condition = new DefaultCondition( - Collections.singletonList("disk_free"), Collections.singletonList("local1"), + new ArrayList<String>() {{ add("disk_free"); }}, + Collections.singletonList("local1"), null, null, startTime, endTime, Precision.MINUTES, null, false); TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition, singletonValueFunctionMap("disk_free")); @@ -151,10 +152,10 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration(), null); MetricHostAggregate expectedAggregate = - createMetricHostAggregate(2.0, 0.0, 20, 15.0); + createMetricHostAggregate(2.0, 0.0, 20, 15.0); Map<TimelineMetric, MetricHostAggregate> - aggMap = new HashMap<TimelineMetric, - MetricHostAggregate>(); + aggMap = new HashMap<TimelineMetric, + MetricHostAggregate>(); long startTime = System.currentTimeMillis(); int min_5 = 5 * 60 * 1000; @@ -179,7 +180,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { // WHEN Condition condition = new DefaultCondition( - Collections.singletonList("disk_used"), Collections.singletonList("test_host"), + new ArrayList<String>() {{ add("disk_free"); }}, + Collections.singletonList("test_host"), "test_app", null, startTime, endTime, Precision.HOURS, null, true); TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition, singletonValueFunctionMap("disk_used")); @@ -200,20 +202,20 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { // GIVEN TimelineMetricAggregator agg = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( - hdb, new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null); + hdb, new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null); long startTime = System.currentTimeMillis(); long ctime = startTime + 1; long minute = 60 * 1000; hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 1)); + "disk_free", 1)); hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 2)); + "disk_free", 2)); ctime += minute; hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 2)); + "disk_free", 2)); hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 1)); + "disk_free", 1)); long endTime = ctime + minute + 1; boolean success = agg.doWork(startTime, endTime); @@ -221,8 +223,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { // WHEN Condition condition = new DefaultCondition( - Collections.singletonList("disk_free"), null, null, null, - startTime, endTime, Precision.SECONDS, null, true); + new ArrayList<String>() {{ add("disk_free"); }}, + null, null, null, startTime, endTime, Precision.SECONDS, null, true); TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition, singletonValueFunctionMap("disk_free")); @@ -240,7 +242,7 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { // GIVEN TimelineMetricAggregator agg = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - new Configuration(), new TimelineMetricMetadataManager(hdb, new Configuration()), null); + new Configuration(), new TimelineMetricMetadataManager(new Configuration(), hdb), null); long startTime = System.currentTimeMillis(); long ctime = startTime + 1; @@ -261,8 +263,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { // WHEN Condition condition = new DefaultCondition( - Collections.singletonList("disk_free"), null, null, null, - null, null, Precision.SECONDS, null, true); + new ArrayList<String>() {{ add("disk_free"); }}, + null, null, null, null, null, Precision.SECONDS, null, true); Multimap<String, List<Function>> mmap = ArrayListMultimap.create(); mmap.put("disk_free", Collections.singletonList(new Function(Function.ReadFunction.SUM, null))); @@ -288,14 +290,14 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { long minute = 60 * 1000; Map<TimelineClusterMetric, MetricClusterAggregate> records = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); records.put(createEmptyTimelineClusterMetric(ctime), new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); records.put(createEmptyTimelineClusterMetric(ctime += minute), new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); records.put(createEmptyTimelineClusterMetric(ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); records.put(createEmptyTimelineClusterMetric(ctime += minute), new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); @@ -305,8 +307,8 @@ public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { // WHEN Condition condition = new DefaultCondition( - Collections.singletonList("disk_used"), null, null, null, - startTime, ctime + minute, Precision.HOURS, null, true); + new ArrayList<String>() {{ add("disk_free"); }}, + null, null, null, startTime, ctime + minute, Precision.HOURS, null, true); TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition, singletonValueFunctionMap("disk_used")); http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java index d668178..bf9246d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import org.apache.phoenix.exception.PhoenixIOException; import org.easymock.EasyMock; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.api.easymock.PowerMock; @@ -42,6 +43,7 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; import java.io.IOException; +import java.lang.reflect.Field; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -53,22 +55,36 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.powermock.api.easymock.PowerMock.*; @RunWith(PowerMockRunner.class) -@PrepareForTest(PhoenixTransactSQL.class) +@PrepareForTest({PhoenixTransactSQL.class, TimelineMetricConfiguration.class}) public class PhoenixHBaseAccessorTest { private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; - @Test - public void testGetMetricRecords() throws SQLException, IOException { + PhoenixConnectionProvider connectionProvider; + PhoenixHBaseAccessor accessor; + @Before + public void setupConf() throws Exception { Configuration hbaseConf = new Configuration(); hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum"); Configuration metricsConf = new Configuration(); + metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1"); + metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "100"); + metricsConf.setStrings( + TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, + "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsAggregatorMemorySink"); + + TimelineMetricConfiguration conf = new TimelineMetricConfiguration(hbaseConf, metricsConf); + mockStatic(TimelineMetricConfiguration.class); + expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes(); + replayAll(); - PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() { + connectionProvider = new PhoenixConnectionProvider() { @Override public HBaseAdmin getHBaseAdmin() throws IOException { return null; @@ -80,21 +96,24 @@ public class PhoenixHBaseAccessorTest { } }; - PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider); + accessor = new PhoenixHBaseAccessor(connectionProvider); + } + @Test + public void testGetMetricRecords() throws SQLException, IOException { List<String> metricNames = new LinkedList<>(); List<String> hostnames = new LinkedList<>(); Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create(); - PowerMock.mockStatic(PhoenixTransactSQL.class); + mockStatic(PhoenixTransactSQL.class); PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class); Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true); - EasyMock.expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once(); + expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once(); ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class); - EasyMock.expect(preparedStatementMock.executeQuery()).andReturn(rsMock); + expect(preparedStatementMock.executeQuery()).andReturn(rsMock); - PowerMock.replayAll(); + replayAll(); EasyMock.replay(preparedStatementMock, rsMock); // Check when startTime < endTime @@ -105,104 +124,64 @@ public class PhoenixHBaseAccessorTest { TimelineMetrics tml2 = accessor.getMetricRecords(condition2, metricFunctions); assertEquals(0, tml2.getMetrics().size()); - PowerMock.verifyAll(); + verifyAll(); EasyMock.verify(preparedStatementMock, rsMock); } @Test - public void testGetMetricRecordsIOException() - throws SQLException, IOException { - - Configuration hbaseConf = new Configuration(); - hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum"); - Configuration metricsConf = new Configuration(); - - PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() { - @Override - public HBaseAdmin getHBaseAdmin() throws IOException { - return null; - } - - @Override - public Connection getConnection() throws SQLException { - return null; - } - }; - - PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider); - + public void testGetMetricRecordsIOException() throws SQLException, IOException { List<String> metricNames = new LinkedList<>(); List<String> hostnames = new LinkedList<>(); Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create(); - PowerMock.mockStatic(PhoenixTransactSQL.class); + mockStatic(PhoenixTransactSQL.class); PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class); Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true); - EasyMock.expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once(); + expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once(); ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class); RuntimeException runtimeException = EasyMock.createNiceMock(RuntimeException.class); IOException io = EasyMock.createNiceMock(IOException.class); - EasyMock.expect(preparedStatementMock.executeQuery()).andThrow(runtimeException); - EasyMock.expect(runtimeException.getCause()).andReturn(io).atLeastOnce(); + expect(preparedStatementMock.executeQuery()).andThrow(runtimeException); + expect(runtimeException.getCause()).andReturn(io).atLeastOnce(); StackTraceElement stackTrace[] = new StackTraceElement[]{new StackTraceElement("TimeRange","method","file",1)}; - EasyMock.expect(io.getStackTrace()).andReturn(stackTrace).atLeastOnce(); + expect(io.getStackTrace()).andReturn(stackTrace).atLeastOnce(); - PowerMock.replayAll(); + replayAll(); EasyMock.replay(preparedStatementMock, rsMock, io, runtimeException); TimelineMetrics tml = accessor.getMetricRecords(condition, metricFunctions); assertEquals(0, tml.getMetrics().size()); - PowerMock.verifyAll(); + verifyAll(); EasyMock.verify(preparedStatementMock, rsMock, io, runtimeException); } @Test - public void testGetMetricRecordsPhoenixIOExceptionDoNotRetryException() - throws SQLException, IOException { - - Configuration hbaseConf = new Configuration(); - hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum"); - Configuration metricsConf = new Configuration(); - - PhoenixConnectionProvider connectionProvider = new PhoenixConnectionProvider() { - @Override - public HBaseAdmin getHBaseAdmin() throws IOException { - return null; - } - - @Override - public Connection getConnection() throws SQLException { - return null; - } - }; - - PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider); - + public void testGetMetricRecordsPhoenixIOExceptionDoNotRetryException() throws SQLException, IOException { List<String> metricNames = new LinkedList<>(); List<String> hostnames = new LinkedList<>(); Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create(); - PowerMock.mockStatic(PhoenixTransactSQL.class); + mockStatic(PhoenixTransactSQL.class); PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class); Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", null, null, Precision.SECONDS, 10, true); - EasyMock.expect(PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(null, condition)).andReturn(preparedStatementMock).once(); + expect(PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(null, condition)).andReturn(preparedStatementMock).once(); PhoenixTransactSQL.setSortMergeJoinEnabled(true); EasyMock.expectLastCall(); ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class); PhoenixIOException pioe1 = EasyMock.createNiceMock(PhoenixIOException.class); PhoenixIOException pioe2 = EasyMock.createNiceMock(PhoenixIOException.class); DoNotRetryIOException dnrioe = EasyMock.createNiceMock(DoNotRetryIOException.class); - EasyMock.expect(preparedStatementMock.executeQuery()).andThrow(pioe1); - EasyMock.expect(pioe1.getCause()).andReturn(pioe2).atLeastOnce(); - EasyMock.expect(pioe2.getCause()).andReturn(dnrioe).atLeastOnce(); + expect(preparedStatementMock.executeQuery()).andThrow(pioe1); + expect(pioe1.getCause()).andReturn(pioe2).atLeastOnce(); + expect(pioe2.getCause()).andReturn(dnrioe).atLeastOnce(); StackTraceElement stackTrace[] = new StackTraceElement[]{new StackTraceElement("HashJoinRegionScanner","method","file",1)}; - EasyMock.expect(dnrioe.getStackTrace()).andReturn(stackTrace).atLeastOnce(); + expect(dnrioe.getStackTrace()).andReturn(stackTrace).atLeastOnce(); - PowerMock.replayAll(); + replayAll(); EasyMock.replay(preparedStatementMock, rsMock, pioe1, pioe2, dnrioe); try { accessor.getMetricRecords(condition, metricFunctions); @@ -210,20 +189,17 @@ public class PhoenixHBaseAccessorTest { } catch (Exception e) { //NOP } - PowerMock.verifyAll(); + verifyAll(); } @Test public void testMetricsCacheCommittingWhenFull() throws IOException, SQLException { Configuration hbaseConf = new Configuration(); hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum"); - Configuration metricsConf = new Configuration(); - metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1"); - metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "100"); - final Connection connection = EasyMock.createNiceMock(Connection.class); + final Connection connection = EasyMock.createNiceMock(Connection.class); - PhoenixHBaseAccessor accessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf) { + accessor = new PhoenixHBaseAccessor(connectionProvider) { @Override public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) { try { @@ -235,7 +211,7 @@ public class PhoenixHBaseAccessorTest { }; TimelineMetrics timelineMetrics = EasyMock.createNiceMock(TimelineMetrics.class); - EasyMock.expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new TimelineMetric())).anyTimes(); + expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new TimelineMetric())).anyTimes(); connection.commit(); EasyMock.expectLastCall().once(); @@ -250,44 +226,33 @@ public class PhoenixHBaseAccessorTest { @Test public void testMetricsAggregatorSink() throws IOException, SQLException { - Configuration hbaseConf = new Configuration(); - hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum"); - Configuration metricsConf = new Configuration(); Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap = new HashMap<>(); Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap = new HashMap<>(); Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<>(); - metricsConf.setStrings( - TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1"); - metricsConf.setStrings( - TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, - "100"); - metricsConf.setStrings( - TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, - "org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsAggregatorMemorySink"); final Connection connection = EasyMock.createNiceMock(Connection.class); - final PreparedStatement statement = - EasyMock.createNiceMock(PreparedStatement.class); - EasyMock.expect(connection.prepareStatement(EasyMock.anyString())) - .andReturn(statement).anyTimes(); + final PreparedStatement statement = EasyMock.createNiceMock(PreparedStatement.class); + expect(connection.prepareStatement(EasyMock.anyString())).andReturn(statement).anyTimes(); EasyMock.replay(statement); EasyMock.replay(connection); - PhoenixConnectionProvider connectionProvider = - new PhoenixConnectionProvider() { - @Override - public HBaseAdmin getHBaseAdmin() throws IOException { - return null; - } + connectionProvider = new PhoenixConnectionProvider() { + + @Override + public HBaseAdmin getHBaseAdmin() throws IOException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return connection; + } + }; - @Override - public Connection getConnection() throws SQLException { - return connection; - } - }; + accessor = new PhoenixHBaseAccessor(connectionProvider); TimelineClusterMetric clusterMetric = new TimelineClusterMetric("metricName", "appId", "instanceId", @@ -303,12 +268,10 @@ public class PhoenixHBaseAccessorTest { clusterTimeAggregateMap.put(clusterMetric, new MetricHostAggregate()); hostAggregateMap.put(timelineMetric, new MetricHostAggregate()); - PhoenixHBaseAccessor accessor = - new PhoenixHBaseAccessor(hbaseConf, metricsConf, connectionProvider); accessor.saveClusterAggregateRecords(clusterAggregateMap); accessor.saveHostAggregateRecords(hostAggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME); - accessor.saveClusterTimeAggregateRecords(clusterTimeAggregateMap, + accessor.saveClusterAggregateRecordsSecond(clusterTimeAggregateMap, PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); TimelineMetricsAggregatorMemorySink memorySink = http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java index 54b8442..dd0378d 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java @@ -66,7 +66,7 @@ public class TimelineMetricStoreWatcherTest { replay(metricStore); TimelineMetricStoreWatcher timelineMetricStoreWatcher = - new TimelineMetricStoreWatcher(metricStore, new TimelineMetricConfiguration()); + new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance()); timelineMetricStoreWatcher.run(); timelineMetricStoreWatcher.run(); timelineMetricStoreWatcher.run(); @@ -97,7 +97,7 @@ public class TimelineMetricStoreWatcherTest { replayAll(); TimelineMetricStoreWatcher timelineMetricStoreWatcher = - new TimelineMetricStoreWatcher(metricStore, new TimelineMetricConfiguration()); + new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance()); timelineMetricStoreWatcher.run(); timelineMetricStoreWatcher.run(); timelineMetricStoreWatcher.run(); http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java index 07fd85d..86c9b40 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java @@ -18,7 +18,29 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; -import junit.framework.Assert; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; @@ -26,42 +48,13 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.junit.After; -import org.junit.Before; import org.junit.Test; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; +import junit.framework.Assert; public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(false); @@ -77,7 +70,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { // GIVEN TimelineMetricAggregator agg = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null); + getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); @@ -130,7 +123,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { // GIVEN TimelineMetricAggregator agg = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null); + getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); @@ -206,7 +199,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { // GIVEN TimelineMetricAggregator agg = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null); + getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); // here we put some metrics tha will be aggregated @@ -290,7 +283,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); - hdb.saveClusterTimeAggregateRecords(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); + hdb.saveClusterAggregateRecordsSecond(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); // WHEN agg.doWork(startTime, ctime + hour + 1000); @@ -490,7 +483,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1"); TimelineMetricAggregator agg = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - conf, new TimelineMetricMetadataManager(hdb, new Configuration()), null); + conf, new TimelineMetricMetadataManager(new Configuration(), hdb), null); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); long startTime = System.currentTimeMillis(); @@ -511,14 +504,13 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { //THEN Condition condition = new DefaultCondition( - Collections.singletonList("cpu_user"), null, "app1", null, + new ArrayList<String>() {{ add("cpu_user"); }}, null, "app1", null, startTime, endTime, null, null, true); condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA), METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt - (conn, condition); + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); ResultSet rs = pstmt.executeQuery(); int recordCount = 0; @@ -542,7 +534,7 @@ public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { public void testClusterAggregateMetricNormalization() throws Exception { TimelineMetricAggregator agg = TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - getConfigurationForTest(false), new TimelineMetricMetadataManager(hdb, new Configuration()), null); + getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null); TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); // Sample data http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java index c62fd34..3adf770 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataManager.java @@ -45,7 +45,7 @@ public class TestMetadataManager extends AbstractMiniHBaseClusterTest { @Before public void insertDummyRecords() throws IOException, SQLException, URISyntaxException { // Initialize new manager - metadataManager = new TimelineMetricMetadataManager(hdb, new Configuration()); + metadataManager = new TimelineMetricMetadataManager(new Configuration(), hdb); final long now = System.currentTimeMillis(); TimelineMetrics timelineMetrics = new TimelineMetrics(); http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java index 181abca..a524b13 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TestMetadataSync.java @@ -68,8 +68,7 @@ public class TestMetadataSync { replay(configuration, hBaseAccessor); - TimelineMetricMetadataManager metadataManager = new - TimelineMetricMetadataManager(hBaseAccessor, configuration); + TimelineMetricMetadataManager metadataManager = new TimelineMetricMetadataManager(new Configuration(), hBaseAccessor); metadataManager.metricMetadataSync = new TimelineMetricMetadataSync(metadataManager); @@ -110,8 +109,7 @@ public class TestMetadataSync { replay(configuration, hBaseAccessor); - TimelineMetricMetadataManager metadataManager = new - TimelineMetricMetadataManager(hBaseAccessor, configuration); + TimelineMetricMetadataManager metadataManager = new TimelineMetricMetadataManager(configuration, hBaseAccessor); metadataManager.putIfModifiedTimelineMetricMetadata(metadata1); metadataManager.putIfModifiedTimelineMetricMetadata(metadata2); http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java new file mode 100644 index 0000000..5d3aacb --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/RawMetricsSourceTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replayAll; + +import java.util.Collection; +import java.util.Collections; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration; +import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import junit.framework.Assert; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(TimelineMetricConfiguration.class) +public class RawMetricsSourceTest { + + @Before + public void setupConf() throws Exception { + TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new + Configuration(), new Configuration()); + mockStatic(TimelineMetricConfiguration.class); + expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes(); + replayAll(); + } + + @Test + public void testRawMetricsSourcedAtFlushInterval() throws Exception { + InternalSourceProvider internalSourceProvider = new DefaultInternalMetricsSourceProvider(); + ExternalMetricsSink rawMetricsSink = createNiceMock(ExternalMetricsSink.class); + expect(rawMetricsSink.getFlushSeconds()).andReturn(1); + expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1); + Capture<Collection<TimelineMetrics>> metricsCapture = new Capture<>(); + rawMetricsSink.sinkMetricData(capture(metricsCapture)); + expectLastCall(); + replay(rawMetricsSink); + + InternalMetricsSource rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + + final long now = System.currentTimeMillis(); + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("m1"); + metric1.setAppId("a1"); + metric1.setInstanceId("i1"); + metric1.setHostName("h1"); + metric1.setStartTime(now - 200); + metric1.setMetricValues(new TreeMap<Long, Double>() {{ + put(now - 100, 1.0); + put(now - 200, 2.0); + }}); + timelineMetrics.getMetrics().add(metric1); + + rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics)); + + verify(rawMetricsSink); + } + + @Test(timeout = 10000) + public void testRawMetricsCachedAndSourced() throws Exception { + ExternalMetricsSink rawMetricsSink = createNiceMock(ExternalMetricsSink.class); + expect(rawMetricsSink.getFlushSeconds()).andReturn(2).anyTimes(); + expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1).anyTimes(); + + class CaptureOnce<T> extends Capture<T> { + @Override + public void setValue(T value) { + if (!hasCaptured()) { + super.setValue(value); + } + } + } + Capture<Collection<TimelineMetrics>> metricsCapture = new CaptureOnce<>(); + + rawMetricsSink.sinkMetricData(capture(metricsCapture)); + expectLastCall(); + replay(rawMetricsSink); + + InternalSourceProvider internalSourceProvider = new DefaultInternalMetricsSourceProvider(); + InternalMetricsSource rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + + final long now = System.currentTimeMillis(); + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("m1"); + metric1.setAppId("a1"); + metric1.setInstanceId("i1"); + metric1.setHostName("h1"); + metric1.setStartTime(now - 200); + metric1.setTimestamp(now - 200); + metric1.setMetricValues(new TreeMap<Long, Double>() {{ + put(now - 100, 1.0); + put(now - 200, 2.0); + }}); + timelineMetrics.getMetrics().add(metric1); + + rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics)); + + // Wait on eviction + Thread.sleep(5000); + + verify(rawMetricsSink); + + Assert.assertTrue(metricsCapture.hasCaptured()); + Assert.assertTrue(metricsCapture.getValue().iterator().next().getMetrics().iterator().next().equals(metric1)); + } + +}