This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 512ed81 ATLAS-3071: updated stats/metrics module to collect notification metrics 512ed81 is described below commit 512ed8181f9cbfc530dc9f9a38502e5daafd7a22 Author: Madhan Neethiraj <mad...@apache.org> AuthorDate: Thu Mar 21 16:35:12 2019 -0700 ATLAS-3071: updated stats/metrics module to collect notification metrics Co-authored-by: "lma <l...@cloudera.com>" (cherry picked from commit 1a5009ea7ec589e5809aae5ead8fac2467394f25) --- .../org/apache/atlas/model/AtlasStatistics.java | 79 ------ .../apache/atlas/model/metrics/AtlasMetrics.java | 57 ++++- .../org/apache/atlas/services/MetricsService.java | 116 ++++----- .../org/apache/atlas/util/AtlasMetricsCounter.java | 268 ++++++++++++++++++++ .../org/apache/atlas/util/AtlasMetricsUtil.java | 271 ++++++++++++++++++++ .../java/org/apache/atlas/util/StatisticsUtil.java | 274 --------------------- .../apache/atlas/services/MetricsServiceTest.java | 115 ++++++++- .../notification/NotificationHookConsumer.java | 107 ++++---- .../web/service/ActiveInstanceElectorService.java | 41 ++- .../NotificationHookConsumerKafkaTest.java | 10 +- .../notification/NotificationHookConsumerTest.java | 32 +-- .../service/ActiveInstanceElectorServiceTest.java | 32 +-- 12 files changed, 866 insertions(+), 536 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java b/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java deleted file mode 100644 index 0ecbd9a..0000000 --- a/intg/src/main/java/org/apache/atlas/model/AtlasStatistics.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.model; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; - -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.NONE; -import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ONLY; - -/** - * Atlas statistics - */ -@JsonAutoDetect(getterVisibility = PUBLIC_ONLY, setterVisibility = PUBLIC_ONLY, fieldVisibility = NONE) -@JsonSerialize(include = JsonSerialize.Inclusion.ALWAYS) -@JsonIgnoreProperties(ignoreUnknown = true) -public class AtlasStatistics { - public static final String STAT_SERVER_START_TS = "Server:upFrom"; - public static final String STAT_SERVER_ACTIVE_TS = "Server:activateFrom"; - public static final String STAT_SERVER_UP_SINCE = "Server:upTime"; - public static final String STAT_START_OFFSET = "Notification:ATLAS_HOOK:offsetStart"; - public static final String STAT_CURRENT_OFFSET = "Notification:ATLAS_HOOK:offsetCurrent"; - public static final String STAT_SOLR_STATUS = "ConnectionStatus:Solr"; - public static final String STAT_HBASE_STATUS = "ConnectionStatus:HBase"; - public static final String STAT_LAST_MESSAGE_PROCESSED_TIME_TS = "Notification:ATLAS_HOOK:messageLastProcessedAt"; - public static final String STAT_AVG_MESSAGE_PROCESSING_TIME = "Notification:ATLAS_HOOK:messageAvgProcessingDuration"; - public static final String STAT_MESSAGES_CONSUMED = "Notification:ATLAS_HOOK:messagesConsumed"; - - private Map<String, Object> data = new HashMap<>(); - - public Map<String, Object> getData() { - return data; - } - - public void setData(Map<String, Object> data) { - this.data = data; - } - - @Override - public String toString() { - return "AtlasStatistics{" + - "data=" + data + - '}'; - } - - @Override - public int hashCode() { - return Objects.hash(data); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - AtlasStatistics other = (AtlasStatistics) o; - - return Objects.equals(this.data, other.data); - } -} diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java index c3304cc..6f7c914 100644 --- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java +++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java @@ -36,6 +36,51 @@ import static com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility.PUBLIC_ @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) @JsonIgnoreProperties(ignoreUnknown=true) public class AtlasMetrics { + public static final String PREFIX_CONNECTION_STATUS = "ConnectionStatus:"; + public static final String PREFIX_NOTIFICATION = "Notification:"; + public static final String PREFIX_SERVER = "Server:"; + + public static final String STAT_CONNECTION_STATUS_BACKEND_STORE = PREFIX_CONNECTION_STATUS + "backendStore"; + public static final String STAT_CONNECTION_STATUS_INDEX_STORE = PREFIX_CONNECTION_STATUS + "indexStore"; + public static final String STAT_NOTIFY_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDay"; + public static final String STAT_NOTIFY_AVG_TIME_CURR_DAY = PREFIX_NOTIFICATION + "currentDayAvgTime"; + public static final String STAT_NOTIFY_CREATES_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayEntityCreates"; + public static final String STAT_NOTIFY_UPDATES_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayEntityUpdates"; + public static final String STAT_NOTIFY_DELETES_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayEntityDeletes"; + public static final String STAT_NOTIFY_FAILED_COUNT_CURR_DAY = PREFIX_NOTIFICATION + "currentDayFailed"; + public static final String STAT_NOTIFY_START_TIME_CURR_DAY = PREFIX_NOTIFICATION + "currentDayStartTime"; + public static final String STAT_NOTIFY_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHour"; + public static final String STAT_NOTIFY_AVG_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourAvgTime"; + public static final String STAT_NOTIFY_CREATES_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourEntityCreates"; + public static final String STAT_NOTIFY_UPDATES_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourEntityUpdates"; + public static final String STAT_NOTIFY_DELETES_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourEntityDeletes"; + public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourFailed"; + public static final String STAT_NOTIFY_START_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourStartTime"; + public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME = PREFIX_NOTIFICATION + "lastMessageProcessedTime"; + public static final String STAT_NOTIFY_START_OFFSET = PREFIX_NOTIFICATION + "offsetStart"; + public static final String STAT_NOTIFY_CURRENT_OFFSET = PREFIX_NOTIFICATION + "offsetCurrent"; + public static final String STAT_NOTIFY_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDay"; + public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY = PREFIX_NOTIFICATION + "previousDayAvgTime"; + public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityCreates"; + public static final String STAT_NOTIFY_UPDATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityUpdates"; + public static final String STAT_NOTIFY_DELETES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityDeletes"; + public static final String STAT_NOTIFY_FAILED_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayFailed"; + public static final String STAT_NOTIFY_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHour"; + public static final String STAT_NOTIFY_AVG_TIME_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourAvgTime"; + public static final String STAT_NOTIFY_CREATES_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourEntityCreates"; + public static final String STAT_NOTIFY_UPDATES_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourEntityUpdates"; + public static final String STAT_NOTIFY_DELETES_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourEntityDeletes"; + public static final String STAT_NOTIFY_FAILED_COUNT_PREV_HOUR = PREFIX_NOTIFICATION + "previousHourFailed"; + public static final String STAT_NOTIFY_COUNT_TOTAL = PREFIX_NOTIFICATION + "total"; + public static final String STAT_NOTIFY_AVG_TIME_TOTAL = PREFIX_NOTIFICATION + "totalAvgTime"; + public static final String STAT_NOTIFY_CREATES_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalCreates"; + public static final String STAT_NOTIFY_UPDATES_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalUpdates"; + public static final String STAT_NOTIFY_DELETES_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalDeletes"; + public static final String STAT_NOTIFY_FAILED_COUNT_TOTAL = PREFIX_NOTIFICATION + "totalFailed"; + public static final String STAT_SERVER_ACTIVE_TIMESTAMP = PREFIX_SERVER + "activeTimeStamp"; + public static final String STAT_SERVER_START_TIMESTAMP = PREFIX_SERVER + "startTimeStamp"; + public static final String STAT_SERVER_UP_TIME = PREFIX_SERVER + "upTime"; + private Map<String, Map<String, Object>> data; public AtlasMetrics() { @@ -63,30 +108,38 @@ public class AtlasMetrics { @JsonIgnore public void addMetric(String groupKey, String key, Object value) { Map<String, Map<String, Object>> data = this.data; + if (data == null) { data = new HashMap<>(); + + this.data = data; } + Map<String, Object> metricMap = data.computeIfAbsent(groupKey, k -> new HashMap<>()); + metricMap.put(key, value); - setData(data); } @JsonIgnore public Number getNumericMetric(String groupKey, String key) { Object metric = getMetric(groupKey, key); + return metric instanceof Number ? (Number) metric : null; } @JsonIgnore public Object getMetric(String groupKey, String key) { + Object ret = null; Map<String, Map<String, Object>> data = this.data; - Object ret = null; + if (data != null) { Map<String, Object> metricMap = data.get(groupKey); + if (metricMap != null && !metricMap.isEmpty()) { ret = metricMap.get(key); } } + return ret; } } diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java index d9ea12a..8fb68e9 100644 --- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java +++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java @@ -18,15 +18,13 @@ package org.apache.atlas.services; import org.apache.atlas.annotation.AtlasService; -import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.instance.AtlasEntity.Status; import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; import org.apache.atlas.type.AtlasTypeRegistry; -import org.apache.atlas.util.StatisticsUtil; +import org.apache.atlas.util.AtlasMetricsUtil; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +51,8 @@ public class MetricsService { public static final String GENERAL = "general"; // Query names + protected static final String METRIC_COLLECTION_TIME = "collectionTime"; + protected static final String METRIC_STATS = "stats"; protected static final String METRIC_TYPE_COUNT = TYPE + "Count"; protected static final String METRIC_TYPE_UNUSED_COUNT = TYPE + "UnusedCount"; protected static final String METRIC_ENTITY_COUNT = ENTITY + "Count"; @@ -61,114 +61,90 @@ public class MetricsService { protected static final String METRIC_TAG_COUNT = TAG + "Count"; protected static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities"; - public static final String METRIC_COLLECTION_TIME = "collectionTime"; - private final AtlasGraph atlasGraph; private final AtlasTypeRegistry typeRegistry; - private final StatisticsUtil statisticsUtil; + private final AtlasMetricsUtil metricsUtil; private final String indexSearchPrefix = AtlasGraphUtilsV2.getIndexSearchPrefix(); @Inject - public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) { + public MetricsService(final AtlasGraph graph, final AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) { this.atlasGraph = graph; this.typeRegistry = typeRegistry; - this.statisticsUtil = statisticsUtil; + this.metricsUtil = metricsUtil; } @SuppressWarnings("unchecked") public AtlasMetrics getMetrics() { - AtlasMetrics metrics = new AtlasMetrics(); - - metrics.addMetric(GENERAL, METRIC_TYPE_COUNT, getAllTypesCount()); - metrics.addMetric(GENERAL, METRIC_TAG_COUNT, getAllTagsCount()); - - Map<String, Long> activeCountMap = new HashMap<>(); - Map<String, Long> deletedCountMap = new HashMap<>(); - - // metrics for classifications + Collection<String> entityDefNames = typeRegistry.getAllEntityDefNames(); Collection<String> classificationDefNames = typeRegistry.getAllClassificationDefNames(); - - if (classificationDefNames != null) { - for (String classificationDefName : classificationDefNames) { - activeCountMap.put(classificationDefName, getTypeCount(classificationDefName, ACTIVE)); - } - } - - // metrics for entities - Collection<String> entityDefNames = typeRegistry.getAllEntityDefNames(); + Map<String, Long> activeEntityCount = new HashMap<>(); + Map<String, Long> deletedEntityCount = new HashMap<>(); + Map<String, Long> taggedEntityCount = new HashMap<>(); + long unusedTypeCount = 0; + long totalEntities = 0; if (entityDefNames != null) { for (String entityDefName : entityDefNames) { - activeCountMap.put(entityDefName, getTypeCount(entityDefName, ACTIVE)); - deletedCountMap.put(entityDefName, getTypeCount(entityDefName, DELETED)); + long activeCount = getTypeCount(entityDefName, ACTIVE); + long deletedCount = getTypeCount(entityDefName, DELETED); + + if (activeCount > 0) { + activeEntityCount.put(entityDefName, activeCount); + totalEntities += activeCount; + } + + if (deletedCount > 0) { + deletedEntityCount.put(entityDefName, deletedCount); + totalEntities += deletedCount; + } + + if (activeCount == 0 && deletedCount == 0) { + unusedTypeCount++; + } } } - Map<String, Long> activeEntityCount = new HashMap<>(); - Map<String, Long> deletedEntityCount = new HashMap<>(); - long unusedTypeCount = 0; - long totalEntities = 0; - - for (String entityDefName : typeRegistry.getAllEntityDefNames()) { - Long activeCount = activeCountMap.get(entityDefName); - Long deletedCount = deletedCountMap.get(entityDefName); - - if (activeCount > 0) { - activeEntityCount.put(entityDefName, activeCount); - totalEntities += activeCount.longValue(); - } - - if (deletedCount > 0) { - deletedEntityCount.put(entityDefName, deletedCount); - totalEntities += deletedCount.longValue(); - } + if (classificationDefNames != null) { + for (String classificationDefName : classificationDefNames) { + long count = getTypeCount(classificationDefName, ACTIVE); - if (activeCount == 0 && deletedCount == 0) { - unusedTypeCount++; + if (count > 0) { + taggedEntityCount.put(classificationDefName, count); + } } } + AtlasMetrics metrics = new AtlasMetrics(); + + metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, System.currentTimeMillis()); + metrics.addMetric(GENERAL, METRIC_STATS, metricsUtil.getStats()); //add atlas server stats + metrics.addMetric(GENERAL, METRIC_TYPE_COUNT, getAllTypesCount()); + metrics.addMetric(GENERAL, METRIC_TAG_COUNT, getAllTagsCount()); metrics.addMetric(GENERAL, METRIC_TYPE_UNUSED_COUNT, unusedTypeCount); metrics.addMetric(GENERAL, METRIC_ENTITY_COUNT, totalEntities); + metrics.addMetric(ENTITY, METRIC_ENTITY_ACTIVE, activeEntityCount); metrics.addMetric(ENTITY, METRIC_ENTITY_DELETED, deletedEntityCount); - Map<String, Long> taggedEntityCount = new HashMap<>(); - - for (String classificationName : typeRegistry.getAllClassificationDefNames()) { - Long count = activeCountMap.get(classificationName); - - if (count > 0) { - taggedEntityCount.put(classificationName, count); - } - } - metrics.addMetric(TAG, METRIC_ENTITIES_PER_TAG, taggedEntityCount); - // Miscellaneous metrics - long collectionTime = System.currentTimeMillis(); - - metrics.addMetric(GENERAL, METRIC_COLLECTION_TIME, collectionTime); - - //add atlas server stats - Map<String, Object> statistics = statisticsUtil.getAtlasStatistics(); - metrics.addMetric(GENERAL, "stats", statistics); - return metrics; } - private Long getTypeCount(String typeName, Status status) { + private long getTypeCount(String typeName, Status status) { + Long ret = null; String indexQuery = indexSearchPrefix + "\"" + ENTITY_TYPE_PROPERTY_KEY + "\" : (%s)" + AND_STR + indexSearchPrefix + "\"" + STATE_PROPERTY_KEY + "\" : (%s)"; indexQuery = String.format(indexQuery, typeName, status.name()); try { - return atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals(); + ret = atlasGraph.indexQuery(VERTEX_INDEX, indexQuery).vertexTotals(); }catch (Exception e){ LOG.error("Failed fetching using indexQuery: " + e.getMessage()); } - return 0l; + + return ret == null ? 0L : ret; } private int getAllTypesCount() { diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java new file mode 100644 index 0000000..acf9e34 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.util; + + +import java.time.Clock; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.ZoneOffset; + +import static org.apache.atlas.util.AtlasMetricsCounter.Period.*; + +public class AtlasMetricsCounter { + public enum Period { ALL, CURR_DAY, CURR_HOUR, PREV_HOUR, PREV_DAY }; + + private final String name; + private final Stats stats; + private Clock clock; + private Instant lastIncrTime; + private Instant dayStartTime; + private Instant dayEndTime; + private Instant hourStartTime; + private Instant hourEndTime; + + public AtlasMetricsCounter(String name) { + this(name, Clock.systemUTC()); + } + + public AtlasMetricsCounter(String name, Clock clock) { + this.name = name; + this.stats = new Stats(); + + init(clock); + } + + public String getName() { return name; } + + public Instant getLastIncrTime() { return lastIncrTime; } + + public void incr() { + incrByWithMeasure(1, 0); + } + + public void incrBy(long count) { + incrByWithMeasure(count, 0); + } + + public void incrWithMeasure(long measure) { + incrByWithMeasure(1, measure); + } + + public void incrByWithMeasure(long count, long measure) { + Instant instant = clock.instant(); + + stats.addCount(ALL, count); + stats.addMeasure(ALL, measure); + + if (instant.isAfter(dayStartTime)) { // ignore times earlier than start of current day + lastIncrTime = instant; + + updateForTime(instant); + + stats.addCount(CURR_DAY, count); + stats.addMeasure(CURR_DAY, measure); + + if (instant.isAfter(hourStartTime)) { // ignore times earlier than start of current hour + stats.addCount(CURR_HOUR, count); + stats.addMeasure(CURR_HOUR, measure); + } + } + } + + public Stats report() { + updateForTime(clock.instant()); + + return new Stats(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli()); + } + + // visible only for testing + void init(Clock clock) { + this.clock = clock; + this.lastIncrTime = Instant.ofEpochSecond(0); + this.dayStartTime = Instant.ofEpochSecond(0); + this.dayEndTime = Instant.ofEpochSecond(0); + this.hourStartTime = Instant.ofEpochSecond(0); + this.hourEndTime = Instant.ofEpochSecond(0); + + updateForTime(clock.instant()); + } + + protected void updateForTime(Instant instant) { + if (instant.isAfter(dayEndTime)) { + rolloverDay(instant); + rolloverHour(instant); + } else if (instant.isAfter(hourEndTime)) { + rolloverHour(instant); + } + } + + protected void rolloverDay(Instant instant) { + Instant dayStartTime = getDayStartTime(instant); + + if (dayStartTime.equals(dayEndTime)) { + stats.copy(CURR_DAY, PREV_DAY); + } else { + stats.reset(PREV_DAY); + } + + stats.reset(CURR_DAY); + + this.dayStartTime = dayStartTime; + this.dayEndTime = getNextDayStartTime(instant); + } + + protected void rolloverHour(Instant instant) { + Instant hourStartTime = getHourStartTime(instant); + + if (hourStartTime.equals(hourEndTime)) { + stats.copy(CURR_HOUR, PREV_HOUR); + } else { + stats.reset(PREV_HOUR); + } + + stats.reset(CURR_HOUR); + + this.hourStartTime = hourStartTime; + this.hourEndTime = getNextHourStartTime(instant); + } + + public static LocalDateTime getLocalDateTime(Instant instant) { + return LocalDateTime.ofInstant(instant, ZoneOffset.UTC); + } + + public static Instant getHourStartTime(Instant instant) { + LocalDateTime time = getLocalDateTime(instant); + + return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).plusHours(time.getHour()).toInstant(ZoneOffset.UTC); + } + + public static Instant getNextHourStartTime(Instant instant) { + LocalDateTime time = getLocalDateTime(instant); + + return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).plusHours(time.getHour() + 1).toInstant(ZoneOffset.UTC); + } + + public static Instant getDayStartTime(Instant instant) { + LocalDateTime time = getLocalDateTime(instant); + + return LocalDateTime.of(time.toLocalDate(), LocalTime.MIN).toInstant(ZoneOffset.UTC); + } + + public static Instant getNextDayStartTime(Instant instant) { + LocalDateTime time = getLocalDateTime(instant); + + return LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.MIN).toInstant(ZoneOffset.UTC); + } + + + public static class Stats { + private static final int NUM_PERIOD = Period.values().length; + + private final long dayStartTimeMs; + private final long hourStartTimeMs; + private final long[] count = new long[NUM_PERIOD]; + private final long[] measureSum = new long[NUM_PERIOD]; + private final long[] measureMin = new long[NUM_PERIOD]; + private final long[] measureMax = new long[NUM_PERIOD]; + + + public Stats() { + dayStartTimeMs = 0; + hourStartTimeMs = 0; + + for (Period period : Period.values()) { + reset(period); + } + } + + public Stats(Stats other, long dayStartTimeMs, long hourStartTimeMs) { + this.dayStartTimeMs = dayStartTimeMs; + this.hourStartTimeMs = hourStartTimeMs; + + copy(other.count, this.count); + copy(other.measureSum, this.measureSum); + copy(other.measureMin, this.measureMin); + copy(other.measureMax, this.measureMax); + } + + public long getDayStartTimeMs() { return dayStartTimeMs; } + + public long getHourStartTimeMs() { return hourStartTimeMs; } + + public long getCount(Period period) { return count[period.ordinal()]; } + + public long getMeasureSum(Period period) { return measureSum[period.ordinal()]; } + + public long getMeasureMin(Period period) { return measureMin[period.ordinal()]; } + + public long getMeasureMax(Period period) { return measureMax[period.ordinal()]; } + + public long getMeasureAvg(Period period) { + int idx = period.ordinal(); + long c = count[idx]; + + return c != 0 ? (measureSum[idx] / c) : 0; + } + + public void addCount(Period period, long num) { + count[period.ordinal()] += num; + } + + public void addMeasure(Period period, long measure) { + int idx = period.ordinal(); + + measureSum[idx] += measure; + + if (measureMin[idx] > measure) { + measureMin[idx] = measure; + } + + if (measureMax[idx] < measure) { + measureMax[idx] = measure; + } + } + + private void copy(Period src, Period dest) { + int srcIdx = src.ordinal(); + int destIdx = dest.ordinal(); + + count[destIdx] = count[srcIdx]; + measureSum[destIdx] = measureSum[srcIdx]; + measureMin[destIdx] = measureMin[srcIdx]; + measureMax[destIdx] = measureMax[srcIdx]; + } + + private void reset(Period period) { + int idx = period.ordinal(); + + count[idx] = 0; + measureSum[idx] = 0; + measureMin[idx] = Long.MAX_VALUE; + measureMax[idx] = Long.MIN_VALUE; + } + + private void copy(long[] src, long[] dest) { + for (int i = 0; i < dest.length; i++) { + dest[i] = src[i]; + } + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java new file mode 100644 index 0000000..c41e6bd --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.util; + +import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; +import org.apache.atlas.util.AtlasMetricsCounter.Stats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.stereotype.Component; + +import javax.inject.Inject; +import java.time.Clock; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; + +import static org.apache.atlas.model.metrics.AtlasMetrics.*; +import static org.apache.atlas.repository.Constants.TYPE_NAME_INTERNAL; +import static org.apache.atlas.repository.Constants.TYPE_NAME_PROPERTY_KEY; +import static org.apache.atlas.util.AtlasMetricsCounter.Period.*; + +@Component +public class AtlasMetricsUtil { + private static final Logger LOG = LoggerFactory.getLogger(AtlasMetricsUtil.class); + + private static final long SEC_MS = 1000; + private static final long MIN_MS = 60 * SEC_MS; + private static final long HOUR_MS = 60 * MIN_MS; + private static final long DAY_MS = 24 * HOUR_MS; + private static final String STATUS_CONNECTED = "connected"; + private static final String STATUS_NOT_CONNECTED = "not-connected"; + + private final AtlasGraph graph; + private long serverStartTime = 0; + private long serverActiveTime = 0; + private long msgOffsetStart = -1; + private long msgOffsetCurrent = 0; + private final AtlasMetricsCounter messagesProcessed = new AtlasMetricsCounter("messagesProcessed"); + private final AtlasMetricsCounter messagesFailed = new AtlasMetricsCounter("messagesFailed"); + private final AtlasMetricsCounter entityCreates = new AtlasMetricsCounter("entityCreates"); + private final AtlasMetricsCounter entityUpdates = new AtlasMetricsCounter("entityUpdates"); + private final AtlasMetricsCounter entityDeletes = new AtlasMetricsCounter("entityDeletes"); + + @Inject + public AtlasMetricsUtil(AtlasGraph graph) { + this.graph = graph; + } + + // visible only for testing + public void init(Clock clock) { + messagesProcessed.init(clock); + messagesFailed.init(clock); + entityCreates.init(clock); + entityUpdates.init(clock); + entityDeletes.init(clock); + } + + public void onServerStart() { + serverStartTime = System.currentTimeMillis(); + } + + public void onServerActivation() { + serverActiveTime = System.currentTimeMillis(); + } + + public void onNotificationProcessingComplete(long msgOffset, NotificationStat stats) { + messagesProcessed.incrWithMeasure(stats.timeTakenMs); + entityCreates.incrBy(stats.entityCreates); + entityUpdates.incrBy(stats.entityUpdates); + entityDeletes.incrBy(stats.entityDeletes); + + if (stats.isFailedMsg) { + messagesFailed.incr(); + } + + if (msgOffsetStart == -1) { + msgOffsetStart = msgOffset; + } + + msgOffsetCurrent = ++msgOffset; + } + + public Map<String, Object> getStats() { + Map<String, Object> ret = new HashMap<>(); + + Stats messagesProcessed = this.messagesProcessed.report(); + Stats messagesFailed = this.messagesFailed.report(); + Stats entityCreates = this.entityCreates.report(); + Stats entityUpdates = this.entityUpdates.report(); + Stats entityDeletes = this.entityDeletes.report(); + + ret.put(STAT_SERVER_START_TIMESTAMP, serverStartTime); + ret.put(STAT_SERVER_ACTIVE_TIMESTAMP, serverActiveTime); + ret.put(STAT_SERVER_UP_TIME, millisToTimeDiff(System.currentTimeMillis() - serverStartTime)); + ret.put(STAT_CONNECTION_STATUS_BACKEND_STORE, getHBaseStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED); + ret.put(STAT_CONNECTION_STATUS_INDEX_STORE, getSolrStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED); + ret.put(STAT_NOTIFY_START_OFFSET, msgOffsetStart); + ret.put(STAT_NOTIFY_CURRENT_OFFSET, msgOffsetCurrent); + ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME, this.messagesProcessed.getLastIncrTime().toEpochMilli()); + + ret.put(STAT_NOTIFY_COUNT_TOTAL, messagesProcessed.getCount(ALL)); + ret.put(STAT_NOTIFY_AVG_TIME_TOTAL, messagesProcessed.getMeasureAvg(ALL)); + ret.put(STAT_NOTIFY_FAILED_COUNT_TOTAL, messagesFailed.getCount(ALL)); + ret.put(STAT_NOTIFY_CREATES_COUNT_TOTAL, entityCreates.getCount(ALL)); + ret.put(STAT_NOTIFY_UPDATES_COUNT_TOTAL, entityUpdates.getCount(ALL)); + ret.put(STAT_NOTIFY_DELETES_COUNT_TOTAL, entityDeletes.getCount(ALL)); + + ret.put(STAT_NOTIFY_START_TIME_CURR_DAY, messagesProcessed.getDayStartTimeMs()); + ret.put(STAT_NOTIFY_COUNT_CURR_DAY, messagesProcessed.getCount(CURR_DAY)); + ret.put(STAT_NOTIFY_AVG_TIME_CURR_DAY, messagesProcessed.getMeasureAvg(CURR_DAY)); + ret.put(STAT_NOTIFY_FAILED_COUNT_CURR_DAY, messagesFailed.getCount(CURR_DAY)); + ret.put(STAT_NOTIFY_CREATES_COUNT_CURR_DAY, entityCreates.getCount(CURR_DAY)); + ret.put(STAT_NOTIFY_UPDATES_COUNT_CURR_DAY, entityUpdates.getCount(CURR_DAY)); + ret.put(STAT_NOTIFY_DELETES_COUNT_CURR_DAY, entityDeletes.getCount(CURR_DAY)); + + ret.put(STAT_NOTIFY_START_TIME_CURR_HOUR, messagesProcessed.getHourStartTimeMs()); + ret.put(STAT_NOTIFY_COUNT_CURR_HOUR, messagesProcessed.getCount(CURR_HOUR)); + ret.put(STAT_NOTIFY_AVG_TIME_CURR_HOUR, messagesProcessed.getMeasureAvg(CURR_HOUR)); + ret.put(STAT_NOTIFY_FAILED_COUNT_CURR_HOUR, messagesFailed.getCount(CURR_HOUR)); + ret.put(STAT_NOTIFY_CREATES_COUNT_CURR_HOUR, entityCreates.getCount(CURR_HOUR)); + ret.put(STAT_NOTIFY_UPDATES_COUNT_CURR_HOUR, entityUpdates.getCount(CURR_HOUR)); + ret.put(STAT_NOTIFY_DELETES_COUNT_CURR_HOUR, entityDeletes.getCount(CURR_HOUR)); + + ret.put(STAT_NOTIFY_COUNT_PREV_HOUR, messagesProcessed.getCount(PREV_HOUR)); + ret.put(STAT_NOTIFY_AVG_TIME_PREV_HOUR, messagesProcessed.getMeasureAvg(PREV_HOUR)); + ret.put(STAT_NOTIFY_FAILED_COUNT_PREV_HOUR, messagesFailed.getCount(PREV_HOUR)); + ret.put(STAT_NOTIFY_CREATES_COUNT_PREV_HOUR, entityCreates.getCount(PREV_HOUR)); + ret.put(STAT_NOTIFY_UPDATES_COUNT_PREV_HOUR, entityUpdates.getCount(PREV_HOUR)); + ret.put(STAT_NOTIFY_DELETES_COUNT_PREV_HOUR, entityDeletes.getCount(PREV_HOUR)); + + ret.put(STAT_NOTIFY_COUNT_PREV_DAY, messagesProcessed.getCount(PREV_DAY)); + ret.put(STAT_NOTIFY_AVG_TIME_PREV_DAY, messagesProcessed.getMeasureAvg(PREV_DAY)); + ret.put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY, messagesFailed.getCount(PREV_DAY)); + ret.put(STAT_NOTIFY_CREATES_COUNT_PREV_DAY, entityCreates.getCount(PREV_DAY)); + ret.put(STAT_NOTIFY_UPDATES_COUNT_PREV_DAY, entityUpdates.getCount(PREV_DAY)); + ret.put(STAT_NOTIFY_DELETES_COUNT_PREV_DAY, entityDeletes.getCount(PREV_DAY)); + + return ret; + } + + private boolean getHBaseStatus(){ + try { + runWithTimeout(new Runnable() { + @Override + public void run() { + graph.query().has(TYPE_NAME_PROPERTY_KEY, TYPE_NAME_INTERNAL).vertices(1); + } + }, 10, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error(e.getMessage()); + return false; + } + + return true; + } + + private boolean getSolrStatus(){ + final String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + Constants.TYPE_NAME_PROPERTY_KEY + "\":(" + TYPE_NAME_INTERNAL + ")"; + + try { + runWithTimeout(new Runnable() { + @Override + public void run() { + graph.indexQuery(Constants.VERTEX_INDEX, query).vertices(0, 1); + } + }, 10, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error(e.getMessage()); + return false; + } + + return true; + } + + private void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception { + runWithTimeout(new Callable<Object>() { + @Override + public Object call() { + runnable.run(); + return null; + } + }, timeout, timeUnit); + } + + private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception { + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final Future<T> future = executor.submit(callable); + + executor.shutdown(); + + try { + return future.get(timeout, timeUnit); + } catch (TimeoutException e) { + future.cancel(true); + + throw e; + } catch (ExecutionException e) { + Throwable t = e.getCause(); + + if (t instanceof Error) { + throw (Error) t; + } else if (t instanceof Exception) { + throw (Exception) t; + } else { + throw new IllegalStateException(t); + } + } + } + + private String millisToTimeDiff(long msDiff) { + StringBuilder sb = new StringBuilder(); + + long diffSeconds = msDiff / SEC_MS % 60; + long diffMinutes = msDiff / MIN_MS % 60; + long diffHours = msDiff / HOUR_MS % 24; + long diffDays = msDiff / DAY_MS; + + if (diffDays > 0) sb.append(diffDays).append(" day "); + if (diffHours > 0) sb.append(diffHours).append(" hour "); + if (diffMinutes > 0) sb.append(diffMinutes).append(" min "); + if (diffSeconds > 0) sb.append(diffSeconds).append(" sec"); + + return sb.toString(); + } + + public static class NotificationStat { + public boolean isFailedMsg = false; + public long timeTakenMs = 0; + public int entityCreates = 0; + public int entityUpdates = 0; + public int entityDeletes = 0; + + public NotificationStat() { } + + public NotificationStat(boolean isFailedMsg, long timeTakenMs) { + this.isFailedMsg = isFailedMsg; + this.timeTakenMs = timeTakenMs; + } + + public void updateStats(EntityMutationResponse response) { + entityCreates += getSize(response.getCreatedEntities()); + entityUpdates += getSize(response.getUpdatedEntities()); + entityUpdates += getSize(response.getPartialUpdatedEntities()); + entityDeletes += getSize(response.getDeletedEntities()); + } + + private int getSize(Collection collection) { + return collection != null ? collection.size() : 0; + } + } +} diff --git a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java b/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java deleted file mode 100644 index efb804b..0000000 --- a/repository/src/main/java/org/apache/atlas/util/StatisticsUtil.java +++ /dev/null @@ -1,274 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.atlas.util; - -import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.model.AtlasStatistics; -import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.graphdb.AtlasGraph; -import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.stereotype.Component; - -import javax.inject.Inject; -import java.text.NumberFormat; -import java.text.SimpleDateFormat; -import java.util.HashMap; -import java.util.Map; -import java.util.Locale; -import java.util.concurrent.*; - -import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_START_TS; -import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_ACTIVE_TS; -import static org.apache.atlas.model.AtlasStatistics.STAT_SERVER_UP_SINCE; -import static org.apache.atlas.model.AtlasStatistics.STAT_START_OFFSET; -import static org.apache.atlas.model.AtlasStatistics.STAT_CURRENT_OFFSET; -import static org.apache.atlas.model.AtlasStatistics.STAT_SOLR_STATUS; -import static org.apache.atlas.model.AtlasStatistics.STAT_HBASE_STATUS; -import static org.apache.atlas.model.AtlasStatistics.STAT_LAST_MESSAGE_PROCESSED_TIME_TS; -import static org.apache.atlas.model.AtlasStatistics.STAT_AVG_MESSAGE_PROCESSING_TIME; -import static org.apache.atlas.model.AtlasStatistics.STAT_MESSAGES_CONSUMED; - -@Component -public class StatisticsUtil { - private static final Logger LOG = LoggerFactory.getLogger(StatisticsUtil.class); - - private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("d MMM, yyyy : hh:mm aaa z"); - - private static final long DAY = 1000 * 60 * 60 * 24; - private static final long HOUR = 1000 * 60 * 60; - private static final long MIN = 1000 * 60; - private static final long SEC = 1000; - - private final AtlasGraph graph; - private final String STATUS_CONNECTED = "connected"; - private final String STATUS_NOT_CONNECTED = "not-connected"; - private final AtlasStatistics atlasStatistics; - - private long countMsgProcessed = 0; - private long totalMsgProcessingTimeMs = 0; - private Locale locale = new Locale("en", "US"); - private NumberFormat numberFormat; - - @Inject - public StatisticsUtil(AtlasGraph graph) { - this.graph = graph; - this.atlasStatistics = new AtlasStatistics(); - numberFormat = NumberFormat.getInstance(locale); - } - - public Map<String, Object> getAtlasStatistics() { - Map<String, Object> statisticsMap = new HashMap<>(); - statisticsMap.putAll(atlasStatistics.getData()); - - statisticsMap.put(STAT_HBASE_STATUS, getHBaseStatus()); - statisticsMap.put(STAT_SOLR_STATUS, getSolrStatus()); - statisticsMap.put(STAT_SERVER_UP_SINCE, getUpSinceTime()); - if(countMsgProcessed > 0) { - statisticsMap.put(STAT_MESSAGES_CONSUMED, countMsgProcessed); - } - formatStatistics(statisticsMap); - - return statisticsMap; - } - - public void setKafkaOffsets(long value) { - if (Long.parseLong(getStat(STAT_START_OFFSET).toString()) == -1) { - addStat(STAT_START_OFFSET, value); - } - addStat(STAT_CURRENT_OFFSET, ++value); - } - - public void setAvgMsgProcessingTime(long value) { - countMsgProcessed++; - totalMsgProcessingTimeMs += value; - value = totalMsgProcessingTimeMs / countMsgProcessed; - - addStat(STAT_AVG_MESSAGE_PROCESSING_TIME, value); - } - - public void setLastMsgProcessedTime() { - addStat(STAT_LAST_MESSAGE_PROCESSED_TIME_TS, System.currentTimeMillis()); - } - - public void setServerStartTime() { - addStat(STAT_SERVER_START_TS, System.currentTimeMillis()); - } - - public void setServerActiveTime() { - addStat(STAT_SERVER_ACTIVE_TS, System.currentTimeMillis()); - } - - - private void addStat(String key, Object value) { - Map<String, Object> data = atlasStatistics.getData(); - if (data == null) { - data = new HashMap<>(); - } - data.put(key, value); - atlasStatistics.setData(data); - } - - private Object getStat(String key) { - Map<String, Object> data = atlasStatistics.getData(); - Object ret = data.get(key); - if (ret == null) { - return -1; - } - return ret; - } - - private void formatStatistics(Map<String, Object> statisticsMap) { - for (Map.Entry<String, Object> stat : statisticsMap.entrySet()) { - switch (stat.getKey()) { - case STAT_SERVER_UP_SINCE: - statisticsMap.put(stat.getKey(), millisToTimeDiff(Long.parseLong(stat.getValue().toString()))); - break; - - case STAT_LAST_MESSAGE_PROCESSED_TIME_TS: - statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString()))); - break; - - case STAT_SERVER_START_TS: - case STAT_SERVER_ACTIVE_TS: - statisticsMap.put(stat.getKey(), millisToTimeStamp(Long.parseLong(stat.getValue().toString()))); - break; - - case STAT_AVG_MESSAGE_PROCESSING_TIME: - statisticsMap.put(stat.getKey(), formatNumber(Long.parseLong(stat.getValue().toString())) + " milliseconds"); - break; - - case STAT_HBASE_STATUS: - case STAT_SOLR_STATUS: - String curState = ((boolean) stat.getValue()) ? STATUS_CONNECTED : STATUS_NOT_CONNECTED; - statisticsMap.put(stat.getKey(), curState); - break; - - case STAT_MESSAGES_CONSUMED: - case STAT_START_OFFSET: - case STAT_CURRENT_OFFSET: - statisticsMap.put(stat.getKey(), formatNumber(Long.parseLong(stat.getValue().toString()))); - break; - - default: - statisticsMap.put(stat.getKey(), stat.getValue()); - } - } - } - - private boolean getHBaseStatus() { - - String query = "g.V().next()"; - try { - runWithTimeout(new Runnable() { - @Override - public void run() { - try { - graph.executeGremlinScript(query, false); - } catch (AtlasBaseException e) { - LOG.error(e.getMessage()); - } - } - }, 10, TimeUnit.SECONDS); - } catch (Exception e) { - LOG.error(e.getMessage()); - return false; - } - - return true; - } - - private boolean getSolrStatus() { - String query = AtlasGraphUtilsV2.getIndexSearchPrefix() + "\"" + "__type.name\"" + " : (*)"; - try { - runWithTimeout(new Runnable() { - @Override - public void run() { - graph.indexQuery(Constants.VERTEX_INDEX, query).vertexTotals(); - } - }, 10, TimeUnit.SECONDS); - } catch (Exception e) { - LOG.error(e.getMessage()); - return false; - } - return true; - } - - private void runWithTimeout(final Runnable runnable, long timeout, TimeUnit timeUnit) throws Exception { - runWithTimeout(new Callable<Object>() { - @Override - public Object call() { - runnable.run(); - return null; - } - }, timeout, timeUnit); - } - - private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception { - final ExecutorService executor = Executors.newSingleThreadExecutor(); - final Future<T> future = executor.submit(callable); - executor.shutdown(); - try { - return future.get(timeout, timeUnit); - } catch (TimeoutException e) { - future.cancel(true); - throw e; - } catch (ExecutionException e) { - Throwable t = e.getCause(); - if (t instanceof Error) { - throw (Error) t; - } else if (t instanceof Exception) { - throw (Exception) t; - } else { - throw new IllegalStateException(t); - } - } - } - - private long getUpSinceTime() { - long upTS = Long.parseLong(getStat(STAT_SERVER_START_TS).toString()); - return System.currentTimeMillis() - upTS; - } - - private String millisToTimeDiff(long msDiff) { - StringBuilder sb = new StringBuilder(); - - long diffSeconds = msDiff / SEC % 60; - long diffMinutes = msDiff / MIN % 60; - long diffHours = msDiff / HOUR % 24; - long diffDays = msDiff / DAY; - - if (diffDays > 0) sb.append(diffDays).append(" day "); - if (diffHours > 0) sb.append(diffHours).append(" hour "); - if (diffMinutes > 0) sb.append(diffMinutes).append(" min "); - if (diffSeconds > 0) sb.append(diffSeconds).append(" sec"); - - return sb.toString(); - } - - private String millisToTimeStamp(long ms) { - return simpleDateFormat.format(ms); - } - - private String formatNumber(long value) { - return numberFormat.format(value); - } - -} diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java index 78e5803..64698c2 100644 --- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java @@ -28,6 +28,8 @@ import org.apache.atlas.repository.impexp.ZipSource; import org.apache.atlas.runner.LocalSolrRunner; import org.apache.atlas.store.AtlasTypeDefStore; import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.util.AtlasMetricsCounter; +import org.apache.atlas.util.AtlasMetricsUtil; import org.testng.SkipException; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -37,10 +39,15 @@ import org.testng.annotations.Test; import javax.inject.Inject; import java.io.FileInputStream; import java.io.IOException; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZoneOffset; import java.util.HashMap; import java.util.Map; import static org.apache.atlas.graph.GraphSandboxUtil.useLocalSolr; +import static org.apache.atlas.model.metrics.AtlasMetrics.*; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.loadModelFromJson; import static org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImportWithNoParameters; import static org.apache.atlas.services.MetricsService.ENTITY; @@ -53,11 +60,11 @@ import static org.apache.atlas.services.MetricsService.METRIC_TAG_COUNT; import static org.apache.atlas.services.MetricsService.METRIC_TYPE_COUNT; import static org.apache.atlas.services.MetricsService.METRIC_TYPE_UNUSED_COUNT; import static org.apache.atlas.services.MetricsService.TAG; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertEquals; +import static org.testng.Assert.*; @Guice(modules = TestModules.TestOnlyModule.class) public class MetricsServiceTest { + public static final String IMPORT_FILE = "metrics-entities-data.zip"; @Inject @@ -72,6 +79,14 @@ public class MetricsServiceTest { @Inject private MetricsService metricsService; + @Inject + private AtlasMetricsUtil metricsUtil; + + TestClock clock = new TestClock(Clock.systemUTC(), ZoneOffset.UTC); + + long msgOffset = 0; + + private final Map<String, Long> activeEntityMetricsExpected = new HashMap<String, Long>() {{ put("hive_storagedesc", 5L); put("__ExportImportAuditEntry", 1L); @@ -95,6 +110,17 @@ public class MetricsServiceTest { put("PII", 1L); }}; + private final Map<String, Object> metricExpected = new HashMap<String, Object>() {{ + put(STAT_NOTIFY_COUNT_CURR_HOUR, 11L); + put(STAT_NOTIFY_FAILED_COUNT_CURR_HOUR, 1L); + put(STAT_NOTIFY_COUNT_PREV_HOUR, 11L); + put(STAT_NOTIFY_FAILED_COUNT_PREV_HOUR, 1L); + put(STAT_NOTIFY_COUNT_CURR_DAY, 33L); + put(STAT_NOTIFY_FAILED_COUNT_CURR_DAY, 3L); + put(STAT_NOTIFY_COUNT_PREV_DAY, 11L); + put(STAT_NOTIFY_FAILED_COUNT_PREV_DAY, 1L); + }}; + @BeforeClass public void setup() { RequestContext.clear(); @@ -148,6 +174,24 @@ public class MetricsServiceTest { assertEquals(deletedEntityMetricsActual, deletedEntityMetricsExpected); } + @Test + public void testNotificationMetrics() { + Instant now = Clock.systemUTC().instant(); + Instant dayStartTime = AtlasMetricsCounter.getDayStartTime(now); + Instant dayEndTime = AtlasMetricsCounter.getNextDayStartTime(now); + Instant hourStartTime = dayEndTime.minusSeconds(60 * 60); + + prepareNotificationData(dayStartTime, hourStartTime); + + clock.setInstant(dayEndTime.minusSeconds(1)); + + Map<String, Object> notificationMetricMap = metricsUtil.getStats(); + + clock.setInstant(null); + + verifyNotificationMetric(metricExpected, notificationMetricMap); + } + private void loadModelFilesAndImportTestData() { try { @@ -165,8 +209,75 @@ public class MetricsServiceTest { } } + private void prepareNotificationData(Instant dayStartTime, Instant hourStartTime) { + Instant prevDayStartTime = AtlasMetricsCounter.getDayStartTime(dayStartTime.minusSeconds(1)); + + msgOffset = 0; + + clock.setInstant(prevDayStartTime); + metricsUtil.init(clock); + clock.setInstant(null); + + processMessage(prevDayStartTime.plusSeconds(3)); // yesterday + processMessage(dayStartTime.plusSeconds(3)); // today + processMessage(hourStartTime.minusSeconds(3)); // past hour + processMessage(hourStartTime.plusSeconds(3)); // this hour + } + + private void processMessage(Instant instant) { + clock.setInstant(instant); + + metricsUtil.onNotificationProcessingComplete(++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1)); + + for (int i = 0; i < 10; i++) { + metricsUtil.onNotificationProcessingComplete(msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1)); + } + + clock.setInstant(null); + } + + private void verifyNotificationMetric(Map<String, Object> metricExpected, Map<String, Object> notificationMetrics) { + assertNotNull(notificationMetrics); + assertNotEquals(notificationMetrics.size(), 0); + assertTrue(notificationMetrics.size() >= metricExpected.size()); + + for (Map.Entry<String, Object> entry : metricExpected.entrySet()) { + assertEquals(notificationMetrics.get(entry.getKey()), entry.getValue(), entry.getKey()); + } + } + public static ZipSource getZipSource(String fileName) throws IOException, AtlasBaseException { FileInputStream fs = ZipFileResourceTestUtils.getFileInputStream(fileName); return new ZipSource(fs); } + + private static class TestClock extends Clock { + private final Clock baseClock; + private final ZoneId zone; + private Instant instant = null; + + public TestClock(Clock baseClock, ZoneId zone) { + this.baseClock = baseClock; + this.zone = zone; + } + + @Override + public ZoneId getZone() { + return zone; + } + + @Override + public TestClock withZone(ZoneId zone) { + return new TestClock(baseClock, zone); + } + + @Override + public Instant instant() { + return instant != null ? instant : baseClock.instant(); + } + + public void setInstant(Instant instant) { + this.instant = instant; + } + } } \ No newline at end of file diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 8430fd4..ce2d76f 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -47,7 +47,8 @@ import org.apache.atlas.notification.preprocessor.EntityPreprocessor; import org.apache.atlas.notification.preprocessor.PreprocessorContext; import org.apache.atlas.notification.preprocessor.PreprocessorContext.PreprocessAction; import org.apache.atlas.utils.LruCache; -import org.apache.atlas.util.StatisticsUtil; +import org.apache.atlas.util.AtlasMetricsUtil; +import org.apache.atlas.util.AtlasMetricsUtil.NotificationStat; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; @@ -140,7 +141,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final ServiceState serviceState; private final AtlasInstanceConverter instanceConverter; private final AtlasTypeRegistry typeRegistry; - private final StatisticsUtil statisticsUtil; + private final AtlasMetricsUtil metricsUtil; private final int maxRetries; private final int failedMsgCacheSize; private final int minWaitDuration; @@ -156,10 +157,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final boolean hiveTypesRemoveOwnedRefAttrs; private final boolean rdbmsTypesRemoveOwnedRefAttrs; private final boolean preprocessEnabled; - - private NotificationInterface notificationInterface; - private ExecutorService executors; - private Configuration applicationProperties; + private final NotificationInterface notificationInterface; + private final Configuration applicationProperties; + private ExecutorService executors; @VisibleForTesting final int consumerRetryInterval; @@ -170,14 +170,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Inject public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, ServiceState serviceState, AtlasInstanceConverter instanceConverter, - AtlasTypeRegistry typeRegistry, StatisticsUtil statisticsUtil) throws AtlasException { + AtlasTypeRegistry typeRegistry, AtlasMetricsUtil metricsUtil) throws AtlasException { this.notificationInterface = notificationInterface; this.atlasEntityStore = atlasEntityStore; this.serviceState = serviceState; this.instanceConverter = instanceConverter; this.typeRegistry = typeRegistry; this.applicationProperties = ApplicationProperties.get(); - this.statisticsUtil = statisticsUtil; + this.metricsUtil = metricsUtil; maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1); @@ -475,12 +475,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @VisibleForTesting void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException { - AtlasPerfTracer perf = null; - HookNotification message = kafkaMsg.getMessage(); - String messageUser = message.getUser(); - long startTime = System.currentTimeMillis(); - boolean isFailedMsg = false; - AuditLog auditLog = null; + AtlasPerfTracer perf = null; + HookNotification message = kafkaMsg.getMessage(); + String messageUser = message.getUser(); + long startTime = System.currentTimeMillis(); + NotificationStat stats = new NotificationStat(); + AuditLog auditLog = null; if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name()); @@ -525,7 +525,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasClient.API_V1.CREATE_ENTITY.getNormalizedPath()); } - createOrUpdate(entities, false, context); + createOrUpdate(entities, false, stats, context); } break; @@ -546,7 +546,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl // There should only be one root entity entities.getEntities().get(0).setGuid(guid); - createOrUpdate(entities, true, context); + createOrUpdate(entities, true, stats, context); } break; @@ -562,7 +562,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); - atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue())); + EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue())); + + stats.updateStats(response); } catch (ClassCastException cle) { LOG.error("Failed to delete entity {}", deleteRequest); } @@ -579,7 +581,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); } - createOrUpdate(entities, false, context); + createOrUpdate(entities, false, stats, context); } break; @@ -593,7 +595,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasClientV2.API_V2.CREATE_ENTITY.getNormalizedPath()); } - createOrUpdate(entities, false, context); + createOrUpdate(entities, false, stats, context); } break; @@ -608,7 +610,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); } - atlasEntityStore.updateEntity(entityId, entity, true); + EntityMutationResponse response = atlasEntityStore.updateEntity(entityId, entity, true); + + stats.updateStats(response); } break; @@ -622,7 +626,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasClientV2.API_V2.UPDATE_ENTITY.getNormalizedPath()); } - createOrUpdate(entities, false, context); + createOrUpdate(entities, false, stats, context); } break; @@ -640,7 +644,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(entity.getTypeName()); - atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes()); + EntityMutationResponse response = atlasEntityStore.deleteByUniqueAttributes(type, entity.getUniqueAttributes()); + + stats.updateStats(response); } } catch (ClassCastException cle) { LOG.error("Failed to do delete entities {}", entities); @@ -661,7 +667,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl LOG.warn("Max retries exceeded for message {}", strMessage, e); - isFailedMsg = true; + stats.isFailedMsg = true; failedMessages.add(strMessage); @@ -689,33 +695,34 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } finally { AtlasPerfTracer.log(perf); - long msgProcessingTime = System.currentTimeMillis() - startTime; + stats.timeTakenMs = System.currentTimeMillis() - startTime; + + metricsUtil.onNotificationProcessingComplete(kafkaMsg.getOffset(), stats); - if (msgProcessingTime > largeMessageProcessingTimeThresholdMs) { + if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) { String strMessage = AbstractNotification.getMessageJson(message); - LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset()); - LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", msgProcessingTime, strMessage.length(), kafkaMsg.getOffset(), strMessage); + LOG.warn("msgProcessingTime={}, msgSize={}, topicOffset={}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset()); + LARGE_MESSAGES_LOG.warn("{\"msgProcessingTime\":{},\"msgSize\":{},\"topicOffset\":{},\"data\":{}}", stats.timeTakenMs, strMessage.length(), kafkaMsg.getOffset(), strMessage); } if (auditLog != null) { - auditLog.setHttpStatus(isFailedMsg ? SC_BAD_REQUEST : SC_OK); - auditLog.setTimeTaken(msgProcessingTime); + auditLog.setHttpStatus(stats.isFailedMsg ? SC_BAD_REQUEST : SC_OK); + auditLog.setTimeTaken(stats.timeTakenMs); AuditFilter.audit(auditLog); } - statisticsUtil.setAvgMsgProcessingTime(msgProcessingTime); } } - private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, PreprocessorContext context) throws AtlasBaseException { + private void createOrUpdate(AtlasEntitiesWithExtInfo entities, boolean isPartialUpdate, NotificationStat stats, PreprocessorContext context) throws AtlasBaseException { List<AtlasEntity> entitiesList = entities.getEntities(); AtlasEntityStream entityStream = new AtlasEntityStream(entities); if (commitBatchSize <= 0 || entitiesList.size() <= commitBatchSize) { EntityMutationResponse response = atlasEntityStore.createOrUpdate(entityStream, isPartialUpdate); - recordProcessedEntities(response, context); + recordProcessedEntities(response, stats, context); } else { for (int fromIdx = 0; fromIdx < entitiesList.size(); fromIdx += commitBatchSize) { int toIndex = fromIdx + commitBatchSize; @@ -733,7 +740,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl EntityMutationResponse response = atlasEntityStore.createOrUpdate(batchStream, isPartialUpdate); - recordProcessedEntities(response, context); + recordProcessedEntities(response, stats, context); RequestContext.get().resetEntityGuidUpdates(); @@ -770,8 +777,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl consumer.commit(partition, kafkaMessage.getOffset() + 1); commitSucceessStatus = true; - statisticsUtil.setKafkaOffsets(kafkaMessage.getOffset()); - statisticsUtil.setLastMsgProcessedTime(); } finally { failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); } @@ -1021,24 +1026,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl return ret; } - private void recordProcessedEntities(EntityMutationResponse mutationResponse, PreprocessorContext context) { - if (mutationResponse != null && context != null) { - if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) { - context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments()); + private void recordProcessedEntities(EntityMutationResponse mutationResponse, NotificationStat stats, PreprocessorContext context) { + if (mutationResponse != null) { + if (stats != null) { + stats.updateStats(mutationResponse); } - if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) { - for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) { - if (entity != null && entity.getGuid() != null) { - context.getCreatedEntities().add(entity.getGuid()); + if (context != null) { + if (MapUtils.isNotEmpty(mutationResponse.getGuidAssignments())) { + context.getGuidAssignments().putAll(mutationResponse.getGuidAssignments()); + } + + if (CollectionUtils.isNotEmpty(mutationResponse.getCreatedEntities())) { + for (AtlasEntityHeader entity : mutationResponse.getCreatedEntities()) { + if (entity != null && entity.getGuid() != null) { + context.getCreatedEntities().add(entity.getGuid()); + } } } - } - if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) { - for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) { - if (entity != null && entity.getGuid() != null) { - context.getDeletedEntities().add(entity.getGuid()); + if (CollectionUtils.isNotEmpty(mutationResponse.getDeletedEntities())) { + for (AtlasEntityHeader entity : mutationResponse.getDeletedEntities()) { + if (entity != null && entity.getGuid() != null) { + context.getDeletedEntities().add(entity.getGuid()); + } } } } diff --git a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java index 7887afb..10081ac 100644 --- a/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java +++ b/webapp/src/main/java/org/apache/atlas/web/service/ActiveInstanceElectorService.java @@ -23,7 +23,7 @@ import org.apache.atlas.ha.AtlasServerIdSelector; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; import org.apache.atlas.service.Service; -import org.apache.atlas.util.StatisticsUtil; +import org.apache.atlas.util.AtlasMetricsUtil; import org.apache.commons.configuration.Configuration; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; @@ -55,18 +55,17 @@ import java.util.Set; @Component @Order(1) public class ActiveInstanceElectorService implements Service, LeaderLatchListener { - private static final Logger LOG = LoggerFactory.getLogger(ActiveInstanceElectorService.class); - private final Configuration configuration; - private final ServiceState serviceState; - private final ActiveInstanceState activeInstanceState; - private final StatisticsUtil statisticsUtil; - private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders; - private List<ActiveStateChangeHandler> activeStateChangeHandlers; - private CuratorFactory curatorFactory; - private LeaderLatch leaderLatch; - private String serverId; + private final Configuration configuration; + private final ServiceState serviceState; + private final ActiveInstanceState activeInstanceState; + private final AtlasMetricsUtil metricsUtil; + private Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders; + private List<ActiveStateChangeHandler> activeStateChangeHandlers; + private CuratorFactory curatorFactory; + private LeaderLatch leaderLatch; + private String serverId; /** * Create a new instance of {@link ActiveInstanceElectorService} @@ -78,14 +77,14 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene ActiveInstanceElectorService(Configuration configuration, Set<ActiveStateChangeHandler> activeStateChangeHandlerProviders, CuratorFactory curatorFactory, ActiveInstanceState activeInstanceState, - ServiceState serviceState, StatisticsUtil statisticsUtil) { - this.configuration = configuration; + ServiceState serviceState, AtlasMetricsUtil metricsUtil) { + this.configuration = configuration; this.activeStateChangeHandlerProviders = activeStateChangeHandlerProviders; - this.activeStateChangeHandlers = new ArrayList<>(); - this.curatorFactory = curatorFactory; - this.activeInstanceState = activeInstanceState; - this.serviceState = serviceState; - this.statisticsUtil = statisticsUtil; + this.activeStateChangeHandlers = new ArrayList<>(); + this.curatorFactory = curatorFactory; + this.activeInstanceState = activeInstanceState; + this.serviceState = serviceState; + this.metricsUtil = metricsUtil; } /** @@ -96,9 +95,9 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene */ @Override public void start() throws AtlasException { - statisticsUtil.setServerStartTime(); + metricsUtil.onServerStart(); if (!HAConfiguration.isHAEnabled(configuration)) { - statisticsUtil.setServerActiveTime(); + metricsUtil.onServerActivation(); LOG.info("HA is not enabled, no need to start leader election service"); return; } @@ -156,7 +155,7 @@ public class ActiveInstanceElectorService implements Service, LeaderLatchListene } activeInstanceState.update(serverId); serviceState.setActive(); - statisticsUtil.setServerActiveTime(); + metricsUtil.onServerActivation(); } catch (Exception e) { LOG.error("Got exception while activating", e); notLeader(); diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index c7ba699..fb3ff26 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -25,7 +25,7 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.kafka.*; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.notification.HookNotification; -import org.apache.atlas.util.StatisticsUtil; +import org.apache.atlas.util.AtlasMetricsUtil; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.notification.HookNotificationV1; import org.apache.atlas.repository.converters.AtlasInstanceConverter; @@ -82,7 +82,7 @@ public class NotificationHookConsumerKafkaTest { private AtlasTypeRegistry typeRegistry; @Mock - private StatisticsUtil statisticsUtil; + private AtlasMetricsUtil metricsUtil; @BeforeTest public void setup() throws AtlasException, InterruptedException, AtlasBaseException { @@ -108,7 +108,7 @@ public class NotificationHookConsumerKafkaTest { produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); @@ -127,7 +127,7 @@ public class NotificationHookConsumerKafkaTest { public void consumerConsumesNewMessageButCommitThrowsAnException_MessageOffsetIsRecorded() throws AtlasException, InterruptedException, AtlasBaseException { ExceptionThrowingCommitConsumer consumer = createNewConsumerThatThrowsExceptionInCommit(kafkaNotification, true); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); NotificationHookConsumer.FailedCommitOffsetRecorder failedCommitOffsetRecorder = hookConsumer.failedCommitOffsetRecorder; @@ -163,7 +163,7 @@ public class NotificationHookConsumerKafkaTest { assertNotNull (consumer); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index de316b6..3e35511 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -26,7 +26,7 @@ import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.notification.HookNotification.HookNotificationType; import org.apache.atlas.notification.NotificationInterface.NotificationType; -import org.apache.atlas.util.StatisticsUtil; +import org.apache.atlas.util.AtlasMetricsUtil; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.repository.converters.AtlasInstanceConverter; @@ -77,7 +77,7 @@ public class NotificationHookConsumerTest { private AtlasTypeRegistry typeRegistry; @Mock - private StatisticsUtil statisticsUtil; + private AtlasMetricsUtil metricsUtil; @BeforeMethod public void setup() throws AtlasBaseException { @@ -96,7 +96,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerCanProceedIfServerIsReady() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); @@ -109,7 +109,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); @@ -126,7 +126,7 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); EntityCreateRequest message = mock(EntityCreateRequest.class); @@ -143,7 +143,7 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); NotificationConsumer consumer = mock(NotificationConsumer.class); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class))); @@ -157,7 +157,7 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); @@ -177,9 +177,7 @@ public class NotificationHookConsumerTest { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); - + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); notificationHookConsumer.startInternal(configuration, executorService); verify(notificationInterface).createConsumers(NotificationType.HOOK, 1); @@ -197,8 +195,7 @@ public class NotificationHookConsumerTest { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); notificationHookConsumer.startInternal(configuration, executorService); @@ -217,7 +214,7 @@ public class NotificationHookConsumerTest { when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsActive(); @@ -237,8 +234,7 @@ public class NotificationHookConsumerTest { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - - final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); doAnswer(new Answer() { @Override @@ -269,8 +265,7 @@ public class NotificationHookConsumerTest { when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - - final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); @@ -335,7 +330,6 @@ public class NotificationHookConsumerTest { when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException()); when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - - return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, statisticsUtil); + return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry, metricsUtil); } } diff --git a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java index 0fe3eba..3ce0c4b 100644 --- a/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java +++ b/webapp/src/test/java/org/apache/atlas/web/service/ActiveInstanceElectorServiceTest.java @@ -23,7 +23,7 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.listener.ActiveStateChangeHandler; -import org.apache.atlas.util.StatisticsUtil; +import org.apache.atlas.util.AtlasMetricsUtil; import org.apache.commons.configuration.Configuration; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.mockito.InOrder; @@ -53,7 +53,7 @@ public class ActiveInstanceElectorServiceTest { private ServiceState serviceState; @Mock - private StatisticsUtil statisticsUtil; + private AtlasMetricsUtil metricsUtil; @BeforeMethod public void setup() { @@ -75,7 +75,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); verify(leaderLatch).start(); @@ -96,7 +96,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); verify(leaderLatch).addListener(activeInstanceElectorService); @@ -108,7 +108,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); verifyZeroInteractions(curatorFactory); @@ -129,7 +129,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.stop(); @@ -151,7 +151,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.stop(); @@ -165,7 +165,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.stop(); verifyZeroInteractions(curatorFactory); @@ -193,7 +193,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); @@ -216,7 +216,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); @@ -249,7 +249,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); @@ -275,7 +275,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader(); @@ -310,7 +310,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, changeHandlers, curatorFactory, - activeInstanceState, serviceState, statisticsUtil); + activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.notLeader(); @@ -322,7 +322,7 @@ public class ActiveInstanceElectorServiceTest { public void testActiveStateSetOnBecomingLeader() { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), - curatorFactory, activeInstanceState, serviceState, statisticsUtil); + curatorFactory, activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.isLeader(); @@ -335,7 +335,7 @@ public class ActiveInstanceElectorServiceTest { public void testPassiveStateSetOnLoosingLeadership() { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), - curatorFactory, activeInstanceState, serviceState, statisticsUtil); + curatorFactory, activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.notLeader(); @@ -362,7 +362,7 @@ public class ActiveInstanceElectorServiceTest { ActiveInstanceElectorService activeInstanceElectorService = new ActiveInstanceElectorService(configuration, new HashSet<ActiveStateChangeHandler>(), - curatorFactory, activeInstanceState, serviceState, statisticsUtil); + curatorFactory, activeInstanceState, serviceState, metricsUtil); activeInstanceElectorService.start(); activeInstanceElectorService.isLeader();