This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new f724df8 ATLAS-3133: enhanced Atlas server to process notifications from multiple Kafka topics f724df8 is described below commit f724df8201328b3b893c2eb8975bca57ecfce3dc Author: Saqeeb Shaikh <saqeeb.sha...@freestoneinfotech.com> AuthorDate: Thu Jul 11 12:28:30 2019 +0530 ATLAS-3133: enhanced Atlas server to process notifications from multiple Kafka topics Signed-off-by: Madhan Neethiraj <mad...@apache.org> (cherry picked from commit 47d4d588f2ed70396bb64b80c6362d8115350339) --- .../java/org/apache/atlas/AtlasConfiguration.java | 26 +++ .../apache/atlas/model/metrics/AtlasMetrics.java | 3 +- .../org/apache/atlas/kafka/AtlasKafkaConsumer.java | 2 +- .../org/apache/atlas/kafka/AtlasKafkaMessage.java | 28 ++-- .../org/apache/atlas/kafka/KafkaNotification.java | 176 +++++++++++++++------ .../org/apache/atlas/kafka/KafkaConsumerTest.java | 32 ++-- .../atlas/kafka/KafkaNotificationMockTest.java | 6 +- .../AbstractNotificationConsumerTest.java | 4 +- .../org/apache/atlas/util/AtlasMetricsCounter.java | 111 +++++++------ .../org/apache/atlas/util/AtlasMetricsUtil.java | 121 +++++++++++--- .../apache/atlas/services/MetricsServiceTest.java | 4 +- .../notification/NotificationHookConsumer.java | 7 +- .../NotificationHookConsumerKafkaTest.java | 5 +- .../notification/NotificationHookConsumerTest.java | 5 +- 14 files changed, 372 insertions(+), 158 deletions(-) diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java index 3ff1316..9da51f5 100644 --- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java +++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java @@ -19,6 +19,7 @@ package org.apache.atlas; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; /** * Enum that encapsulated each property name and its default value. @@ -39,6 +40,9 @@ public enum AtlasConfiguration { NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"), NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"), + NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names", "ATLAS_HOOK"), // a comma separated list of topic names + NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names", "ATLAS_ENTITIES"), // a comma separated list of topic names + NOTIFICATION_MESSAGE_MAX_LENGTH_BYTES("atlas.notification.message.max.length.bytes", (1000 * 1000)), NOTIFICATION_MESSAGE_COMPRESSION_ENABLED("atlas.notification.message.compression.enabled", true), NOTIFICATION_SPLIT_MESSAGE_SEGMENTS_WAIT_TIME_SECONDS("atlas.notification.split.message.segments.wait.time.seconds", 15 * 60), @@ -84,6 +88,28 @@ public enum AtlasConfiguration { return APPLICATION_PROPERTIES.getString(propertyName, defaultValue.toString()); } + public String[] getStringArray() { + String[] ret = APPLICATION_PROPERTIES.getStringArray(propertyName); + + if (ret == null || ret.length == 0 || (ret.length == 1 && StringUtils.isEmpty(ret[0]))) { + if (defaultValue != null) { + ret = StringUtils.split(defaultValue.toString(), ','); + } + } + + return ret; + } + + public String[] getStringArray(String... defaultValue) { + String[] ret = APPLICATION_PROPERTIES.getStringArray(propertyName); + + if (ret == null || ret.length == 0 || (ret.length == 1 && StringUtils.isEmpty(ret[0]))) { + ret = defaultValue; + } + + return ret; + } + public Object get() { Object value = APPLICATION_PROPERTIES.getProperty(propertyName); return value == null ? defaultValue : value; diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java index c011ad9..a48d93b 100644 --- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java +++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java @@ -55,8 +55,7 @@ public class AtlasMetrics { public static final String STAT_NOTIFY_FAILED_COUNT_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourFailed"; public static final String STAT_NOTIFY_START_TIME_CURR_HOUR = PREFIX_NOTIFICATION + "currentHourStartTime"; public static final String STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME = PREFIX_NOTIFICATION + "lastMessageProcessedTime"; - public static final String STAT_NOTIFY_START_OFFSET = PREFIX_NOTIFICATION + "offsetStart"; - public static final String STAT_NOTIFY_CURRENT_OFFSET = PREFIX_NOTIFICATION + "offsetCurrent"; + public static final String STAT_NOTIFY_TOPIC_OFFSETS = PREFIX_NOTIFICATION + "topicOffsets"; public static final String STAT_NOTIFY_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDay"; public static final String STAT_NOTIFY_AVG_TIME_PREV_DAY = PREFIX_NOTIFICATION + "previousDayAvgTime"; public static final String STAT_NOTIFY_CREATES_COUNT_PREV_DAY = PREFIX_NOTIFICATION + "previousDayEntityCreates"; diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java index 5c840c3..49f9ba3 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaConsumer.java @@ -88,7 +88,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> { continue; } - messages.add(new AtlasKafkaMessage(message, record.offset(), record.partition())); + messages.add(new AtlasKafkaMessage(message, record.offset(), record.topic(), record.partition())); } } diff --git a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java index b04aba9..22bd79f 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java +++ b/notification/src/main/java/org/apache/atlas/kafka/AtlasKafkaMessage.java @@ -18,15 +18,17 @@ package org.apache.atlas.kafka; +import org.apache.kafka.common.TopicPartition; + public class AtlasKafkaMessage<T> { - private final T message; - private final long offset; - private final int partition; - - public AtlasKafkaMessage(T message, long offset, int partition) { - this.message = message; - this.offset = offset; - this.partition = partition; + private final T message; + private final long offset; + private final TopicPartition topicPartition; + + public AtlasKafkaMessage(T message, long offset, String topic, int partition) { + this.message = message; + this.offset = offset; + this.topicPartition = new TopicPartition(topic, partition); } public T getMessage() { @@ -37,8 +39,16 @@ public class AtlasKafkaMessage<T> { return offset; } + public TopicPartition getTopicPartition() { + return topicPartition; + } + + public String getTopic() { + return topicPartition.topic(); + } + public int getPartition() { - return partition; + return topicPartition.partition(); } } diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java index 449eb6f..46c68be 100644 --- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java +++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java @@ -57,20 +57,30 @@ public class KafkaNotification extends AbstractNotification implements Service { public static final String ATLAS_ENTITIES_TOPIC = AtlasConfiguration.NOTIFICATION_ENTITIES_TOPIC_NAME.getString(); protected static final String CONSUMER_GROUP_ID_PROPERTY = "group.id"; + private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC); + private static final String[] ATLAS_ENTITIES_CONSUMER_TOPICS = AtlasConfiguration.NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_ENTITIES_TOPIC); + private static final String DEFAULT_CONSUMER_CLOSED_ERROR_MESSAGE = "This consumer has already been closed."; - private static final Map<NotificationType, String> TOPIC_MAP = new HashMap<NotificationType, String>() { + private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<NotificationType, String>() { { put(NotificationType.HOOK, ATLAS_HOOK_TOPIC); put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC); } }; - private final Properties properties; - private final Long pollTimeOutMs; - private KafkaConsumer consumer; - private KafkaProducer producer; - private String consumerClosedErrorMsg; + private static final Map<NotificationType, String[]> CONSUMER_TOPICS_MAP = new HashMap<NotificationType, String[]>() { + { + put(NotificationType.HOOK, trimAndPurge(ATLAS_HOOK_CONSUMER_TOPICS)); + put(NotificationType.ENTITIES, trimAndPurge(ATLAS_ENTITIES_CONSUMER_TOPICS)); + } + }; + + private final Properties properties; + private final Long pollTimeOutMs; + private final Map<NotificationType, List<KafkaConsumer>> consumers = new HashMap<>(); + private final Map<NotificationType, KafkaProducer> producers = new HashMap<>(); + private String consumerClosedErrorMsg; // ----- Constructors ---------------------------------------------------- @@ -125,8 +135,8 @@ public class KafkaNotification extends AbstractNotification implements Service { } @VisibleForTesting - String getTopicName(NotificationType notificationType) { - return TOPIC_MAP.get(notificationType); + String getProducerTopicName(NotificationType notificationType) { + return PRODUCER_TOPIC_MAP.get(notificationType); } // ----- Service --------------------------------------------------------- @@ -156,10 +166,43 @@ public class KafkaNotification extends AbstractNotification implements Service { public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) { LOG.info("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled); - Properties consumerProperties = getConsumerProperties(notificationType); - AtlasKafkaConsumer kafkaConsumer = new AtlasKafkaConsumer(notificationType, getKafkaConsumer(consumerProperties, notificationType, autoCommitEnabled), autoCommitEnabled, pollTimeOutMs); + String[] topics = CONSUMER_TOPICS_MAP.get(notificationType); + + if (numConsumers < topics.length) { + LOG.warn("consumers count {} is fewer than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics.", numConsumers, topics.length, topics.length); + + numConsumers = topics.length; + } else if (numConsumers > topics.length) { + LOG.warn("consumers count {} is higher than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics", numConsumers, topics.length, topics.length); + + numConsumers = topics.length; + } + + List<KafkaConsumer> notificationConsumers = this.consumers.get(notificationType); + + if (notificationConsumers == null) { + notificationConsumers = new ArrayList<>(numConsumers); + + this.consumers.put(notificationType, notificationConsumers); + } + + List<NotificationConsumer<T>> consumers = new ArrayList<>(); + Properties consumerProperties = getConsumerProperties(notificationType); + + consumerProperties.put("enable.auto.commit", autoCommitEnabled); - List<NotificationConsumer<T>> consumers = Collections.singletonList(kafkaConsumer); + for (int i = 0; i < numConsumers; i++) { + KafkaConsumer existingConsumer = notificationConsumers.size() > i ? notificationConsumers.get(i) : null; + KafkaConsumer kafkaConsumer = getOrCreateKafkaConsumer(existingConsumer, consumerProperties, notificationType, i); + + if (notificationConsumers.size() > i) { + notificationConsumers.set(i, kafkaConsumer); + } else { + notificationConsumers.add(kafkaConsumer); + } + + consumers.add(new AtlasKafkaConsumer(notificationType, kafkaConsumer, autoCommitEnabled, pollTimeOutMs)); + } LOG.info("<== KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled); @@ -170,29 +213,33 @@ public class KafkaNotification extends AbstractNotification implements Service { public void close() { LOG.info("==> KafkaNotification.close()"); - if (producer != null) { - producer.close(); - - producer = null; + for (KafkaProducer producer : producers.values()) { + if (producer != null) { + try { + producer.close(); + } catch (Throwable t) { + LOG.error("failed to close Kafka producer. Ignoring", t); + } + } } + producers.clear(); + LOG.info("<== KafkaNotification.close()"); } // ----- AbstractNotification -------------------------------------------- @Override - public void sendInternal(NotificationType type, List<String> messages) throws NotificationException { - if (producer == null) { - createProducer(); - } + public void sendInternal(NotificationType notificationType, List<String> messages) throws NotificationException { + KafkaProducer producer = getOrCreateProducer(notificationType); - sendInternalToProducer(producer, type, messages); + sendInternalToProducer(producer, notificationType, messages); } @VisibleForTesting - void sendInternalToProducer(Producer p, NotificationType type, List<String> messages) throws NotificationException { - String topic = TOPIC_MAP.get(type); + void sendInternalToProducer(Producer p, NotificationType notificationType, List<String> messages) throws NotificationException { + String topic = PRODUCER_TOPIC_MAP.get(notificationType); List<MessageContext> messageContexts = new ArrayList<>(); for (String message : messages) { @@ -229,53 +276,82 @@ public class KafkaNotification extends AbstractNotification implements Service { } } + // Get properties for consumer request + @VisibleForTesting + public Properties getConsumerProperties(NotificationType notificationType) { + // find the configured group id for the given notification type + String groupId = properties.getProperty(notificationType.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); - public KafkaConsumer getKafkaConsumer(Properties consumerProperties, NotificationType type, boolean autoCommitEnabled) { - if (consumer == null || !isKafkaConsumerOpen(consumer)) { - try { - String topic = TOPIC_MAP.get(type); + if (StringUtils.isEmpty(groupId)) { + throw new IllegalStateException("No configuration group id set for the notification type " + notificationType); + } - consumerProperties.put("enable.auto.commit", autoCommitEnabled); + Properties consumerProperties = new Properties(); + + consumerProperties.putAll(properties); + consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - this.consumer = new KafkaConsumer(consumerProperties); + return consumerProperties; + } + + @VisibleForTesting + public KafkaConsumer getOrCreateKafkaConsumer(KafkaConsumer existingConsumer, Properties consumerProperties, NotificationType notificationType, int idxConsumer) { + KafkaConsumer ret = existingConsumer; - this.consumer.subscribe(Arrays.asList(topic)); - } catch (Exception ee) { - LOG.error("Exception in getKafkaConsumer ", ee); + try { + if (ret == null || !isKafkaConsumerOpen(ret)) { + String[] topics = CONSUMER_TOPICS_MAP.get(notificationType); + String topic = topics[idxConsumer % topics.length]; + + LOG.debug("Creating new KafkaConsumer for topic : {}, index : {}", topic, idxConsumer); + + ret = new KafkaConsumer(consumerProperties); + + ret.subscribe(Arrays.asList(topic)); } + } catch (Exception ee) { + LOG.error("Exception in getKafkaConsumer ", ee); } - return this.consumer; + return ret; } + private KafkaProducer getOrCreateProducer(NotificationType notificationType) { + LOG.debug("==> KafkaNotification.getOrCreateProducer()"); - @VisibleForTesting - public - // Get properties for consumer request - Properties getConsumerProperties(NotificationType type) { - // find the configured group id for the given notification type - String groupId = properties.getProperty(type.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY); + KafkaProducer ret = producers.get(notificationType); - if (StringUtils.isEmpty(groupId)) { - throw new IllegalStateException("No configuration group id set for the notification type " + type); - } + if (ret == null) { + synchronized (this) { + ret = producers.get(notificationType); - Properties consumerProperties = new Properties(); + if (ret == null) { + ret = new KafkaProducer(properties); - consumerProperties.putAll(properties); - consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + producers.put(notificationType, ret); + } + } + } - return consumerProperties; + LOG.debug("<== KafkaNotification.getOrCreateProducer()"); + + return ret; } - private synchronized void createProducer() { - LOG.info("==> KafkaNotification.createProducer()"); + public static String[] trimAndPurge(String[] strings) { + List<String> ret = new ArrayList<>(); - if (producer == null) { - producer = new KafkaProducer(properties); + if (strings != null) { + for (int i = 0; i < strings.length; i++) { + String str = StringUtils.trim(strings[i]); + + if (StringUtils.isNotEmpty(str)) { + ret.add(str); + } + } } - LOG.info("<== KafkaNotification.createProducer()"); + return ret.toArray(new String[ret.size()]); } private class MessageContext { diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java index 847caa3..1af1f3e 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java @@ -54,7 +54,8 @@ import static org.testng.Assert.*; public class KafkaConsumerTest { private static final String TRAIT_NAME = "MyTrait"; - private final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); + private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); + private static final String[] ATLAS_HOOK_CONSUMER_TOPICS = KafkaNotification.trimAndPurge(AtlasConfiguration.NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES.getStringArray(ATLAS_HOOK_TOPIC)); @Mock @@ -67,11 +68,25 @@ public class KafkaConsumerTest { @Test public void testReceive() throws Exception { - Referenceable entity = getEntity(TRAIT_NAME); - EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + for (String topic : ATLAS_HOOK_CONSUMER_TOPICS) { + String traitName = TRAIT_NAME + "_" + topic; + Referenceable entity = getEntity(traitName); + EntityUpdateRequest message = new EntityUpdateRequest("user1", entity); + List<AtlasKafkaMessage<HookNotification>> messageList = testReceiveHelper(message, topic); + assertTrue(messageList.size() > 0); + + HookNotification consumedMessage = messageList.get(0).getMessage(); + + assertMessagesEqual(message, consumedMessage, entity); + } + } + + + private List<AtlasKafkaMessage<HookNotification>> testReceiveHelper(EntityUpdateRequest message, String topic) throws Exception { + String json = AtlasType.toV1Json(new AtlasNotificationMessage<>(new MessageVersion("1.0.0"), message)); - TopicPartition tp = new TopicPartition(ATLAS_HOOK_TOPIC, 0); - List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(ATLAS_HOOK_TOPIC, 0, 0L, "mykey", json)); + TopicPartition tp = new TopicPartition(topic, 0); + List<ConsumerRecord<String, String>> klist = Collections.singletonList(new ConsumerRecord<>(topic, 0, 0L, "mykey", json)); Map mp = Collections.singletonMap(tp, klist); ConsumerRecords records = new ConsumerRecords(mp); @@ -81,12 +96,7 @@ public class KafkaConsumerTest { AtlasKafkaConsumer consumer = new AtlasKafkaConsumer(NotificationType.HOOK, kafkaConsumer, false, 100L); List<AtlasKafkaMessage<HookNotification>> messageList = consumer.receive(); - - assertTrue(messageList.size() > 0); - - HookNotification consumedMessage = messageList.get(0).getMessage(); - - assertMessagesEqual(message, consumedMessage, entity); + return messageList; } @Test diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java index 263903b..9b5891f 100644 --- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java +++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationMockTest.java @@ -74,7 +74,7 @@ public class KafkaNotificationMockTest { KafkaNotification kafkaNotification = new KafkaNotification(configProperties); Producer producer = mock(Producer.class); - String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); + String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK); String message = "This is a test message"; Future returnValue = mock(Future.class); TopicPartition topicPartition = new TopicPartition(topicName, 0); @@ -96,7 +96,7 @@ public class KafkaNotificationMockTest { KafkaNotification kafkaNotification = new KafkaNotification(configProperties); Producer producer = mock(Producer.class); - String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); + String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK); String message = "This is a test message"; Future returnValue = mock(Future.class); when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception")); @@ -121,7 +121,7 @@ public class KafkaNotificationMockTest { KafkaNotification kafkaNotification = new KafkaNotification(configProperties); Producer producer = mock(Producer.class); - String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK); + String topicName = kafkaNotification.getProducerTopicName(NotificationInterface.NotificationType.HOOK); String message1 = "This is a test message1"; String message2 = "This is a test message2"; Future returnValue1 = mock(Future.class); diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java index c963830..05d0d81 100644 --- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java @@ -183,6 +183,8 @@ public class AbstractNotificationConsumerTest { } private static class TestNotificationConsumer extends AbstractNotificationConsumer<TestMessage> { + private static final String TEST_TOPIC_NAME = "TEST_TOPIC"; + private final List<TestMessage> messageList; private int index = 0; @@ -217,7 +219,7 @@ public class AbstractNotificationConsumerTest { public List<AtlasKafkaMessage<TestMessage>> receive(long timeoutMilliSeconds) { List<AtlasKafkaMessage<TestMessage>> tempMessageList = new ArrayList(); for(Object json : messageList) { - tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, -1)); + tempMessageList.add(new AtlasKafkaMessage(deserializer.deserialize((String) json), -1, TEST_TOPIC_NAME, -1)); } return tempMessageList; } diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java index d5a4412..10319d0 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsCounter.java @@ -24,6 +24,7 @@ import java.time.Instant; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.atlas.util.AtlasMetricsCounter.Period.*; @@ -87,10 +88,10 @@ public class AtlasMetricsCounter { } } - public Stats report() { + public StatsReport report() { updateForTime(clock.instant()); - return new Stats(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli()); + return new StatsReport(stats, dayStartTime.toEpochMilli(), hourStartTime.toEpochMilli()); } // visible only for testing @@ -179,16 +180,15 @@ public class AtlasMetricsCounter { return LocalDateTime.of(time.toLocalDate().plusDays(1), LocalTime.MIN).toInstant(ZoneOffset.UTC); } - public static class Stats { private static final int NUM_PERIOD = Period.values().length; - private final long dayStartTimeMs; - private final long hourStartTimeMs; - private final long[] count = new long[NUM_PERIOD]; - private final long[] measureSum = new long[NUM_PERIOD]; - private final long[] measureMin = new long[NUM_PERIOD]; - private final long[] measureMax = new long[NUM_PERIOD]; + private final long dayStartTimeMs; + private final long hourStartTimeMs; + private final AtomicLong[] count = new AtomicLong[NUM_PERIOD]; + private final AtomicLong[] measureSum = new AtomicLong[NUM_PERIOD]; + private final AtomicLong[] measureMin = new AtomicLong[NUM_PERIOD]; + private final AtomicLong[] measureMax = new AtomicLong[NUM_PERIOD]; public Stats() { @@ -200,7 +200,57 @@ public class AtlasMetricsCounter { } } - public Stats(Stats other, long dayStartTimeMs, long hourStartTimeMs) { + public void addCount(Period period, long num) { + count[period.ordinal()].addAndGet(num); + } + + public void addMeasure(Period period, long measure) { + int idx = period.ordinal(); + + measureSum[idx].addAndGet(measure); + + if (measureMin[idx].get() > measure) { + measureMin[idx].set(measure); + } + + if (measureMax[idx].get() < measure) { + measureMax[idx].set(measure); + } + } + + private void copy(Period src, Period dest) { + int srcIdx = src.ordinal(); + int destIdx = dest.ordinal(); + + count[destIdx].set(count[srcIdx].get()); + measureSum[destIdx].set(measureSum[srcIdx].get()); + measureMin[destIdx].set(measureMin[srcIdx].get()); + measureMax[destIdx].set( measureMax[srcIdx].get()); + } + + private void reset(Period period) { + int idx = period.ordinal(); + + count[idx] = new AtomicLong(0); + measureSum[idx] = new AtomicLong(0); + measureMin[idx] = new AtomicLong(Long.MAX_VALUE); + measureMax[idx] = new AtomicLong(Long.MIN_VALUE); + } + + } + + public static class StatsReport { + private static final int NUM_PERIOD = Period.values().length; + + private final long dayStartTimeMs; + private final long hourStartTimeMs; + private final long[] count = new long[NUM_PERIOD]; + private final long[] measureSum = new long[NUM_PERIOD]; + private final long[] measureMin = new long[NUM_PERIOD]; + private final long[] measureMax = new long[NUM_PERIOD]; + + + public StatsReport(Stats other, long dayStartTimeMs, long hourStartTimeMs) { this.dayStartTimeMs = dayStartTimeMs; this.hourStartTimeMs = hourStartTimeMs; @@ -229,46 +279,9 @@ public class AtlasMetricsCounter { return c != 0 ? (measureSum[idx] / c) : 0; } - public void addCount(Period period, long num) { - count[period.ordinal()] += num; - } - - public void addMeasure(Period period, long measure) { - int idx = period.ordinal(); - - measureSum[idx] += measure; - - if (measureMin[idx] > measure) { - measureMin[idx] = measure; - } - - if (measureMax[idx] < measure) { - measureMax[idx] = measure; - } - } - - private void copy(Period src, Period dest) { - int srcIdx = src.ordinal(); - int destIdx = dest.ordinal(); - - count[destIdx] = count[srcIdx]; - measureSum[destIdx] = measureSum[srcIdx]; - measureMin[destIdx] = measureMin[srcIdx]; - measureMax[destIdx] = measureMax[srcIdx]; - } - - private void reset(Period period) { - int idx = period.ordinal(); - - count[idx] = 0; - measureSum[idx] = 0; - measureMin[idx] = Long.MAX_VALUE; - measureMax[idx] = Long.MIN_VALUE; - } - - private void copy(long[] src, long[] dest) { + private void copy(AtomicLong[] src, long[] dest) { for (int i = 0; i < dest.length; i++) { - dest[i] = src[i]; + dest[i] = src[i].get(); } } } diff --git a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java index f658caa..2c78cbc 100644 --- a/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java +++ b/repository/src/main/java/org/apache/atlas/util/AtlasMetricsUtil.java @@ -21,7 +21,7 @@ import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2; -import org.apache.atlas.util.AtlasMetricsCounter.Stats; +import org.apache.atlas.util.AtlasMetricsCounter.StatsReport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,16 +50,15 @@ public class AtlasMetricsUtil { private static final String STATUS_CONNECTED = "connected"; private static final String STATUS_NOT_CONNECTED = "not-connected"; - private final AtlasGraph graph; - private long serverStartTime = 0; - private long serverActiveTime = 0; - private long msgOffsetStart = -1; - private long msgOffsetCurrent = 0; - private final AtlasMetricsCounter messagesProcessed = new AtlasMetricsCounter("messagesProcessed"); - private final AtlasMetricsCounter messagesFailed = new AtlasMetricsCounter("messagesFailed"); - private final AtlasMetricsCounter entityCreates = new AtlasMetricsCounter("entityCreates"); - private final AtlasMetricsCounter entityUpdates = new AtlasMetricsCounter("entityUpdates"); - private final AtlasMetricsCounter entityDeletes = new AtlasMetricsCounter("entityDeletes"); + private final AtlasGraph graph; + private long serverStartTime = 0; + private long serverActiveTime = 0; + private final Map<String, TopicStats> topicStats = new HashMap<>(); + private final AtlasMetricsCounter messagesProcessed = new AtlasMetricsCounter("messagesProcessed"); + private final AtlasMetricsCounter messagesFailed = new AtlasMetricsCounter("messagesFailed"); + private final AtlasMetricsCounter entityCreates = new AtlasMetricsCounter("entityCreates"); + private final AtlasMetricsCounter entityUpdates = new AtlasMetricsCounter("entityUpdates"); + private final AtlasMetricsCounter entityDeletes = new AtlasMetricsCounter("entityDeletes"); @Inject public AtlasMetricsUtil(AtlasGraph graph) { @@ -83,7 +82,7 @@ public class AtlasMetricsUtil { serverActiveTime = System.currentTimeMillis(); } - public void onNotificationProcessingComplete(long msgOffset, NotificationStat stats) { + public void onNotificationProcessingComplete(String topicName, int partition, long msgOffset, NotificationStat stats) { messagesProcessed.incrWithMeasure(stats.timeTakenMs); entityCreates.incrBy(stats.entityCreates); entityUpdates.incrBy(stats.entityUpdates); @@ -93,21 +92,33 @@ public class AtlasMetricsUtil { messagesFailed.incr(); } - if (msgOffsetStart == -1) { - msgOffsetStart = msgOffset; + TopicStats topicStat = topicStats.get(topicName); + + if (topicStat == null) { + topicStat = new TopicStats(topicName); + + topicStats.put(topicName, topicStat); } - msgOffsetCurrent = ++msgOffset; + TopicPartitionStat partitionStat = topicStat.get(partition); + + if (partitionStat == null) { + partitionStat = new TopicPartitionStat(topicName, partition, msgOffset, msgOffset); + + topicStat.set(partition, partitionStat); + } + + partitionStat.setCurrentOffset(msgOffset + 1); } public Map<String, Object> getStats() { Map<String, Object> ret = new HashMap<>(); - Stats messagesProcessed = this.messagesProcessed.report(); - Stats messagesFailed = this.messagesFailed.report(); - Stats entityCreates = this.entityCreates.report(); - Stats entityUpdates = this.entityUpdates.report(); - Stats entityDeletes = this.entityDeletes.report(); + StatsReport messagesProcessed = this.messagesProcessed.report(); + StatsReport messagesFailed = this.messagesFailed.report(); + StatsReport entityCreates = this.entityCreates.report(); + StatsReport entityUpdates = this.entityUpdates.report(); + StatsReport entityDeletes = this.entityDeletes.report(); ret.put(STAT_SERVER_START_TIMESTAMP, serverStartTime); ret.put(STAT_SERVER_ACTIVE_TIMESTAMP, serverActiveTime); @@ -115,8 +126,20 @@ public class AtlasMetricsUtil { ret.put(STAT_SERVER_STATUS_BACKEND_STORE, getBackendStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED); ret.put(STAT_SERVER_STATUS_INDEX_STORE, getIndexStoreStatus() ? STATUS_CONNECTED : STATUS_NOT_CONNECTED); - ret.put(STAT_NOTIFY_START_OFFSET, msgOffsetStart); - ret.put(STAT_NOTIFY_CURRENT_OFFSET, msgOffsetCurrent); + Map<String, Map<String, Long>> topicOffsets = new HashMap<>(); + + for (TopicStats tStat : topicStats.values()) { + for (TopicPartitionStat tpStat : tStat.partitionStats.values()) { + Map<String, Long> tpOffsets = new HashMap<>(); + + tpOffsets.put("offsetStart", tpStat.startOffset); + tpOffsets.put("offsetCurrent", tpStat.currentOffset); + + topicOffsets.put(tpStat.topicName + "-" + tpStat.partition, tpOffsets); + } + } + + ret.put(STAT_NOTIFY_TOPIC_OFFSETS, topicOffsets); ret.put(STAT_NOTIFY_LAST_MESSAGE_PROCESSED_TIME, this.messagesProcessed.getLastIncrTime().toEpochMilli()); ret.put(STAT_NOTIFY_COUNT_TOTAL, messagesProcessed.getCount(ALL)); @@ -297,4 +320,58 @@ public class AtlasMetricsUtil { return collection != null ? collection.size() : 0; } } + + class TopicStats { + private final String topicName; + private final Map<Integer, TopicPartitionStat> partitionStats = new HashMap<>(); + + public TopicStats(String topicName) { + this.topicName = topicName; + } + + public String getTopicName() { return topicName; } + + public Map<Integer, TopicPartitionStat> getPartitionStats() { return partitionStats; } + + public TopicPartitionStat get(Integer partition) { return partitionStats.get(partition); } + + public void set(Integer partition, TopicPartitionStat partitionStat) { + partitionStats.put(partition, partitionStat); + } + } + + class TopicPartitionStat { + private final String topicName; + private final int partition; + private final long startOffset; + private long currentOffset; + + public TopicPartitionStat(String topicName, int partition, long startOffset, long currentOffset) { + this.topicName = topicName; + this.partition = partition; + this.startOffset = startOffset; + this.currentOffset = currentOffset; + } + + public String getTopicName() { + return topicName; + } + + public int getPartition() { + return partition; + } + + public long getStartOffset() { + return startOffset; + } + + public long getCurrentOffset() { + return currentOffset; + } + + public void setCurrentOffset(long currentOffset) { + this.currentOffset = currentOffset; + } + + }; } diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java index b56019e..b2f2633 100644 --- a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java @@ -227,10 +227,10 @@ public class MetricsServiceTest { private void processMessage(Instant instant) { clock.setInstant(instant); - metricsUtil.onNotificationProcessingComplete(++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1)); + metricsUtil.onNotificationProcessingComplete("ATLAS_HOOK", 0, ++msgOffset, new AtlasMetricsUtil.NotificationStat(true, 1)); for (int i = 0; i < 10; i++) { - metricsUtil.onNotificationProcessingComplete(msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1)); + metricsUtil.onNotificationProcessingComplete("ATLAS_HOOK", 0, msgOffset++, new AtlasMetricsUtil.NotificationStat(false, 1)); } clock.setInstant(null); diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 1f8e810..1c8d72b 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -23,7 +23,6 @@ import kafka.utils.ShutdownableThread; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasClientV2; -import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.AtlasException; import org.apache.atlas.AtlasServiceException; import org.apache.atlas.RequestContext; @@ -118,7 +117,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private static final String ATTRIBUTE_QUALIFIED_NAME = "qualifiedName"; private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); - private static final String ATLAS_HOOK_TOPIC = AtlasConfiguration.NOTIFICATION_HOOK_TOPIC_NAME.getString(); public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; @@ -701,7 +699,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl stats.timeTakenMs = System.currentTimeMillis() - startTime; - metricsUtil.onNotificationProcessingComplete(kafkaMsg.getOffset(), stats); + metricsUtil.onNotificationProcessingComplete(kafkaMsg.getTopic(), kafkaMsg.getPartition(), kafkaMsg.getOffset(), stats); if (stats.timeTakenMs > largeMessageProcessingTimeThresholdMs) { String strMessage = AbstractNotification.getMessageJson(message); @@ -785,9 +783,8 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { recordFailedMessages(); - TopicPartition partition = new TopicPartition(ATLAS_HOOK_TOPIC, kafkaMessage.getPartition()); + consumer.commit(kafkaMessage.getTopicPartition(), kafkaMessage.getOffset() + 1); - consumer.commit(partition, kafkaMessage.getOffset() + 1); commitSucceessStatus = true; } finally { failedCommitOffsetRecorder.recordIfFailed(commitSucceessStatus, kafkaMessage.getOffset()); diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index fb3ff26..b1b0e9f 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -182,7 +182,10 @@ public class NotificationHookConsumerKafkaTest { ExceptionThrowingCommitConsumer createNewConsumerThatThrowsExceptionInCommit(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { Properties prop = kafkaNotification.getConsumerProperties(NotificationInterface.NotificationType.HOOK); - KafkaConsumer consumer = kafkaNotification.getKafkaConsumer(prop, NotificationInterface.NotificationType.HOOK, true); + + prop.put("enable.auto.commit", autoCommitEnabled); + + KafkaConsumer consumer = kafkaNotification.getOrCreateKafkaConsumer(null, prop, NotificationInterface.NotificationType.HOOK, 0); return new ExceptionThrowingCommitConsumer(NotificationInterface.NotificationType.HOOK, consumer, autoCommitEnabled, 1000); } diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index 3e35511..ece46a4 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -22,6 +22,7 @@ import org.apache.atlas.AtlasServiceException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.kafka.AtlasKafkaMessage; +import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.EntityMutationResponse; import org.apache.atlas.model.notification.HookNotification.HookNotificationType; @@ -136,7 +137,7 @@ public class NotificationHookConsumerTest { when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE); when(message.getEntities()).thenReturn(Arrays.asList(mock)); - hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); + hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1)); verify(consumer).commit(any(TopicPartition.class), anyInt()); } @@ -150,7 +151,7 @@ public class NotificationHookConsumerTest { when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message")); - hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); + hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, KafkaNotification.ATLAS_HOOK_TOPIC, -1)); verifyZeroInteractions(consumer); }