http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 779298a..456a778 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -29,13 +29,14 @@ import org.apache.atlas.RequestContextV1; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.listener.ActiveStateChangeHandler; -import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.notification.HookNotification; +import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest; -import org.apache.atlas.v1.model.notification.HookNotification.EntityDeleteRequest; -import org.apache.atlas.v1.model.notification.HookNotification.EntityPartialUpdateRequest; -import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest; -import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream; @@ -56,10 +57,7 @@ import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import javax.inject.Inject; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.List; +import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -76,37 +74,37 @@ import static org.apache.atlas.AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE; @Order(4) @DependsOn(value = {"atlasTypeDefStoreInitializer", "atlasTypeDefGraphStoreV1"}) public class NotificationHookConsumer implements Service, ActiveStateChangeHandler { - private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); - private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class); - private static final String LOCALHOST = "localhost"; - private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); + private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class); + private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class); + private static final Logger FAILED_LOG = LoggerFactory.getLogger("FAILED"); + private static final String LOCALHOST = "localhost"; private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName(); - public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; - public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; + public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads"; + public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries"; public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize"; - public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; - public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval"; - public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval"; - + public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval"; + public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval"; + public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval"; public static final int SERVER_READY_WAIT_TIME_MS = 1000; - private final AtlasEntityStore atlasEntityStore; - private final ServiceState serviceState; + + private final AtlasEntityStore atlasEntityStore; + private final ServiceState serviceState; private final AtlasInstanceConverter instanceConverter; - private final AtlasTypeRegistry typeRegistry; - private final int maxRetries; - private final int failedMsgCacheSize; + private final AtlasTypeRegistry typeRegistry; + private final int maxRetries; + private final int failedMsgCacheSize; + private final int minWaitDuration; + private final int maxWaitDuration; + + private NotificationInterface notificationInterface; + private ExecutorService executors; + private Configuration applicationProperties; @VisibleForTesting final int consumerRetryInterval; - private final int minWaitDuration; - private final int maxWaitDuration; - - private NotificationInterface notificationInterface; - private ExecutorService executors; - private Configuration applicationProperties; @VisibleForTesting List<HookConsumer> consumers; @@ -116,18 +114,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl ServiceState serviceState, AtlasInstanceConverter instanceConverter, AtlasTypeRegistry typeRegistry) throws AtlasException { this.notificationInterface = notificationInterface; - this.atlasEntityStore = atlasEntityStore; - this.serviceState = serviceState; - this.instanceConverter = instanceConverter; - this.typeRegistry = typeRegistry; - + this.atlasEntityStore = atlasEntityStore; + this.serviceState = serviceState; + this.instanceConverter = instanceConverter; + this.typeRegistry = typeRegistry; this.applicationProperties = ApplicationProperties.get(); - maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); - failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); + maxRetries = applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3); + failedMsgCacheSize = applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20); consumerRetryInterval = applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500); - minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default - maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default + minWaitDuration = applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, consumerRetryInterval); // 500 ms by default + maxWaitDuration = applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, minWaitDuration * 60); // 30 sec by default } @Override @@ -144,21 +141,24 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } if (!HAConfiguration.isHAEnabled(configuration)) { LOG.info("HA is disabled, starting consumers inline."); + startConsumers(executorService); } } private void startConsumers(ExecutorService executorService) { - int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); - List<NotificationConsumer<HookNotificationMessage>> notificationConsumers = - notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); + int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); + List<NotificationConsumer<HookNotification>> notificationConsumers = notificationInterface.createConsumers(NotificationType.HOOK, numThreads); + if (executorService == null) { - executorService = Executors.newFixedThreadPool(notificationConsumers.size(), - new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build()); + executorService = Executors.newFixedThreadPool(notificationConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build()); } + executors = executorService; - for (final NotificationConsumer<HookNotificationMessage> consumer : notificationConsumers) { + + for (final NotificationConsumer<HookNotification> consumer : notificationConsumers) { HookConsumer hookConsumer = new HookConsumer(consumer); + consumers.add(hookConsumer); executors.submit(hookConsumer); } @@ -171,11 +171,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl stopConsumerThreads(); if (executors != null) { executors.shutdown(); + if (!executors.awaitTermination(5000, TimeUnit.MILLISECONDS)) { LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } + executors = null; } + notificationInterface.close(); } catch (InterruptedException e) { LOG.error("Failure in shutting down consumers"); @@ -189,6 +192,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl for (HookConsumer consumer : consumers) { consumer.shutdown(); } + consumers.clear(); } @@ -204,6 +208,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void instanceIsActive() { LOG.info("Reacting to active state: initializing Kafka consumers"); + startConsumers(executors); } @@ -216,6 +221,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @Override public void instanceIsPassive() { LOG.info("Reacting to passive state: shutting down Kafka consumers."); + stop(); } @@ -235,18 +241,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private final long maxDuration; private final long minDuration; private final long resetInterval; + private long lastWaitAt; - private long lastWaitAt; @VisibleForTesting long waitDuration; public AdaptiveWaiter(long minDuration, long maxDuration, long increment) { - this.minDuration = minDuration; - this.maxDuration = maxDuration; - this.increment = increment; - - this.waitDuration = minDuration; - this.lastWaitAt = 0; + this.minDuration = minDuration; + this.maxDuration = maxDuration; + this.increment = increment; + this.waitDuration = minDuration; + this.lastWaitAt = 0; this.resetInterval = maxDuration * 2; } @@ -268,7 +273,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void setWaitDurations() { long timeSinceLastWait = (lastWaitAt == 0) ? 0 : System.currentTimeMillis() - lastWaitAt; + lastWaitAt = System.currentTimeMillis(); + if (timeSinceLastWait > resetInterval) { waitDuration = minDuration; } else { @@ -282,14 +289,14 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl @VisibleForTesting class HookConsumer extends ShutdownableThread { - private final NotificationConsumer<HookNotificationMessage> consumer; - private final AtomicBoolean shouldRun = new AtomicBoolean(false); - private List<HookNotificationMessage> failedMessages = new ArrayList<>(); - - private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); + private final NotificationConsumer<HookNotification> consumer; + private final AtomicBoolean shouldRun = new AtomicBoolean(false); + private final List<HookNotification> failedMessages = new ArrayList<>(); + private final AdaptiveWaiter adaptiveWaiter = new AdaptiveWaiter(minWaitDuration, maxWaitDuration, minWaitDuration); - public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer) { + public HookConsumer(NotificationConsumer<HookNotification> consumer) { super("atlas-hook-consumer-thread", false); + this.consumer = consumer; } @@ -306,8 +313,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { while (shouldRun.get()) { try { - List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(); - for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { + List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive(); + + for (AtlasKafkaMessage<HookNotification> msg : messages) { handleMessage(msg); } } catch (IllegalStateException ex) { @@ -315,6 +323,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } catch (Exception e) { if (shouldRun.get()) { LOG.warn("Exception in NotificationHookConsumer", e); + adaptiveWaiter.pause(e); } else { break; @@ -324,6 +333,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } finally { if (consumer != null) { LOG.info("closing NotificationConsumer"); + consumer.close(); } @@ -332,11 +342,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } @VisibleForTesting - void handleMessage(AtlasKafkaMessage<HookNotificationMessage> kafkaMsg) throws AtlasServiceException, AtlasException { - AtlasPerfTracer perf = null; - - HookNotificationMessage message = kafkaMsg.getMessage(); - String messageUser = message.getUser(); + void handleMessage(AtlasKafkaMessage<HookNotification> kafkaMsg) throws AtlasServiceException, AtlasException { + AtlasPerfTracer perf = null; + HookNotification message = kafkaMsg.getMessage(); + String messageUser = message.getUser(); if (AtlasPerfTracer.isPerfTraceEnabled(PERF_LOG)) { perf = AtlasPerfTracer.getPerfTracer(PERF_LOG, message.getType().name()); @@ -344,21 +353,25 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { // Used for intermediate conversions during create and update - AtlasEntity.AtlasEntitiesWithExtInfo entities = null; + AtlasEntitiesWithExtInfo entities = null; + for (int numRetries = 0; numRetries < maxRetries; numRetries++) { if (LOG.isDebugEnabled()) { LOG.debug("handleMessage({}): attempt {}", message.getType().name(), numRetries); } + try { RequestContextV1 requestContext = RequestContextV1.get(); + requestContext.setUser(messageUser); switch (message.getType()) { case ENTITY_CREATE: - EntityCreateRequest createRequest = (EntityCreateRequest) message; + final EntityCreateRequest createRequest = (EntityCreateRequest) message; if (numRetries == 0) { // audit only on the first attempt AtlasBaseClient.API api = AtlasClient.API_V1.CREATE_ENTITY; + audit(messageUser, api.getMethod(), api.getNormalizedPath()); } @@ -372,19 +385,16 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (numRetries == 0) { // audit only on the first attempt AtlasBaseClient.API api = UPDATE_ENTITY_BY_ATTRIBUTE; - audit(messageUser, api.getMethod(), - String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName())); + + audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), partialUpdateRequest.getTypeName())); } Referenceable referenceable = partialUpdateRequest.getEntity(); + entities = instanceConverter.toAtlasEntity(referenceable); AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName()); - String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, new HashMap<String, Object>() { - { - put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue()); - } - }); + String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType, Collections.singletonMap(partialUpdateRequest.getAttribute(), (Object)partialUpdateRequest.getAttributeValue())); // There should only be one root entity entities.getEntities().get(0).setGuid(guid); @@ -397,30 +407,30 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (numRetries == 0) { // audit only on the first attempt AtlasBaseClient.API api = DELETE_ENTITY_BY_ATTRIBUTE; - audit(messageUser, api.getMethod(), - String.format(api.getNormalizedPath(), deleteRequest.getTypeName())); + + audit(messageUser, api.getMethod(), String.format(api.getNormalizedPath(), deleteRequest.getTypeName())); } try { AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName()); - atlasEntityStore.deleteByUniqueAttributes(type, - new HashMap<String, Object>() {{ - put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue()); - }}); + + atlasEntityStore.deleteByUniqueAttributes(type, Collections.singletonMap(deleteRequest.getAttribute(), (Object) deleteRequest.getAttributeValue())); } catch (ClassCastException cle) { LOG.error("Failed to do a partial update on Entity"); } break; case ENTITY_FULL_UPDATE: - EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; + final EntityUpdateRequest updateRequest = (EntityUpdateRequest) message; if (numRetries == 0) { // audit only on the first attempt AtlasBaseClient.API api = UPDATE_ENTITY; + audit(messageUser, api.getMethod(), api.getNormalizedPath()); } entities = instanceConverter.toAtlasEntities(updateRequest.getEntities()); + atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false); break; @@ -433,6 +443,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl LOG.warn("Error handling message", e); try { LOG.info("Sleeping for {} ms before retry", consumerRetryInterval); + Thread.sleep(consumerRetryInterval); } catch (InterruptedException ie) { LOG.error("Notification consumer thread sleep interrupted"); @@ -440,7 +451,9 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl if (numRetries == (maxRetries - 1)) { LOG.warn("Max retries exceeded for message {}", message, e); + failedMessages.add(message); + if (failedMessages.size() >= failedMsgCacheSize) { recordFailedMessages(); } @@ -458,15 +471,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl private void recordFailedMessages() { //logging failed messages - for (HookNotificationMessage message : failedMessages) { + for (HookNotification message : failedMessages) { FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", AbstractNotification.getMessageJson(message)); } + failedMessages.clear(); } - private void commit(AtlasKafkaMessage<HookNotificationMessage> kafkaMessage) { + private void commit(AtlasKafkaMessage<HookNotification> kafkaMessage) { recordFailedMessages(); + TopicPartition partition = new TopicPartition("ATLAS_HOOK", kafkaMessage.getPartition()); + consumer.commit(partition, kafkaMessage.getOffset() + 1); } @@ -474,22 +490,23 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl try { while (serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) { try { - LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", - SERVER_READY_WAIT_TIME_MS); + LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", SERVER_READY_WAIT_TIME_MS); + timer.sleep(SERVER_READY_WAIT_TIME_MS); } catch (InterruptedException e) { - LOG.info("Interrupted while waiting for Atlas Server to become ready, " - + "exiting consumer thread.", e); + LOG.info("Interrupted while waiting for Atlas Server to become ready, " + "exiting consumer thread.", e); + return false; } } } catch (Throwable e) { - LOG.info( - "Handled AtlasServiceException while waiting for Atlas Server to become ready, " - + "exiting consumer thread.", e); + LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", e); + return false; } + LOG.info("Atlas Server is ready, can start reading Kafka events."); + return true; } @@ -504,12 +521,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl } super.initiateShutdown(); + shouldRun.set(false); + if (consumer != null) { consumer.wakeup(); } super.awaitShutdown(); + LOG.info("<== HookConsumer shutdown()"); } } @@ -519,7 +539,6 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl LOG.debug("==> audit({},{}, {})", messageUser, method, path); } - AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, - DateTimeHelper.formatDateUTC(new Date())); + AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST, DateTimeHelper.formatDateUTC(new Date())); } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java index 517d25f..5baafeb 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java @@ -21,11 +21,13 @@ package org.apache.atlas.notification; import com.google.common.collect.ImmutableSet; import org.apache.atlas.AtlasClient; import org.apache.atlas.kafka.NotificationProvider; +import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.v1.model.instance.Id; import org.apache.atlas.v1.model.instance.Referenceable; import org.apache.atlas.v1.model.instance.Struct; -import org.apache.atlas.v1.model.typedef.TraitTypeDefinition; -import org.apache.atlas.v1.model.notification.EntityNotification; +import org.apache.atlas.v1.model.notification.EntityNotificationV1; +import org.apache.atlas.v1.model.notification.EntityNotificationV1.OperationType; +import org.apache.atlas.v1.model.typedef.*; import org.apache.atlas.type.AtlasType; import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; import org.apache.atlas.web.integration.BaseResourceIT; @@ -42,33 +44,35 @@ import static org.testng.Assert.assertTrue; * Entity Notification Integration Tests. */ public class EntityNotificationIT extends BaseResourceIT { - - private final String DATABASE_NAME = "db" + randomString(); - private final String TABLE_NAME = "table" + randomString(); - private NotificationInterface notificationInterface = NotificationProvider.get(); - private Id tableId; - private Id dbId; - private String traitName; - private NotificationConsumer notificationConsumer; + private final String DATABASE_NAME = "db" + randomString(); + private final String TABLE_NAME = "table" + randomString(); + private final NotificationInterface notificationInterface = NotificationProvider.get(); + private Id tableId; + private Id dbId; + private String traitName; + private NotificationConsumer notificationConsumer; @BeforeClass public void setUp() throws Exception { super.setUp(); + createTypeDefinitionsV1(); + Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(DATABASE_NAME); + dbId = createInstance(HiveDBInstance); - notificationConsumer = notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES, 1).get(0); + notificationConsumer = notificationInterface.createConsumers(NotificationType.ENTITIES, 1).get(0); } public void testCreateEntity() throws Exception { Referenceable tableInstance = createHiveTableInstanceBuiltIn(DATABASE_NAME, TABLE_NAME, dbId); + tableId = createInstance(tableInstance); final String guid = tableId._getId(); - waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); } public void testUpdateEntity() throws Exception { @@ -79,83 +83,83 @@ public class EntityNotificationIT extends BaseResourceIT { atlasClientV1.updateEntityAttribute(guid, property, newValue); - waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid)); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_UPDATE, HIVE_TABLE_TYPE_BUILTIN, guid)); } public void testDeleteEntity() throws Exception { - final String tableName = "table-" + randomString(); - final String dbName = "db-" + randomString(); - Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName); - Id dbId = createInstance(HiveDBInstance); + final String tableName = "table-" + randomString(); + final String dbName = "db-" + randomString(); + final Referenceable HiveDBInstance = createHiveDBInstanceBuiltIn(dbName); + final Id dbId = createInstance(HiveDBInstance); + final Referenceable tableInstance = createHiveTableInstanceBuiltIn(dbName, tableName, dbId); + final Id tableId = createInstance(tableInstance); + final String guid = tableId._getId(); - Referenceable tableInstance = createHiveTableInstanceBuiltIn(dbName, tableName, dbId); - final Id tableId = createInstance(tableInstance); - final String guid = tableId._getId(); - - waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_CREATE, HIVE_TABLE_TYPE_BUILTIN, guid)); final String name = (String) tableInstance.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME); atlasClientV1.deleteEntity(HIVE_TABLE_TYPE_BUILTIN, AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); - waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); + waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.ENTITY_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); } public void testAddTrait() throws Exception { String superSuperTraitName = "SuperTrait" + randomString(); - createTrait(superSuperTraitName); - - String superTraitName = "SuperTrait" + randomString(); - createTrait(superTraitName, superSuperTraitName); + String superTraitName = "SuperTrait" + randomString(); traitName = "Trait" + randomString(); + + createTrait(superSuperTraitName); + createTrait(superTraitName, superSuperTraitName); createTrait(traitName, superTraitName); - Struct traitInstance = new Struct(traitName); + Struct traitInstance = new Struct(traitName); String traitInstanceJSON = AtlasType.toV1Json(traitInstance); + LOG.debug("Trait instance = {}", traitInstanceJSON); final String guid = tableId._getId(); atlasClientV1.addTrait(guid, traitInstance); - EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); + EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); Referenceable entity = entityNotification.getEntity(); + assertTrue(entity.getTraitNames().contains(traitName)); - List<Struct> allTraits = entityNotification.getAllTraits(); + List<Struct> allTraits = entityNotification.getAllTraits(); List<String> allTraitNames = new LinkedList<>(); for (Struct struct : allTraits) { allTraitNames.add(struct.getTypeName()); } + assertTrue(allTraitNames.contains(traitName)); assertTrue(allTraitNames.contains(superTraitName)); assertTrue(allTraitNames.contains(superSuperTraitName)); String anotherTraitName = "Trait" + randomString(); + createTrait(anotherTraitName, superTraitName); - traitInstance = new Struct(anotherTraitName); + traitInstance = new Struct(anotherTraitName); traitInstanceJSON = AtlasType.toV1Json(traitInstance); + LOG.debug("Trait instance = {}", traitInstanceJSON); atlasClientV1.addTrait(guid, traitInstance); - entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); + entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, newNotificationPredicate(OperationType.TRAIT_ADD, HIVE_TABLE_TYPE_BUILTIN, guid)); - allTraits = entityNotification.getAllTraits(); + allTraits = entityNotification.getAllTraits(); allTraitNames = new LinkedList<>(); for (Struct struct : allTraits) { allTraitNames.add(struct.getTypeName()); } + assertTrue(allTraitNames.contains(traitName)); assertTrue(allTraitNames.contains(anotherTraitName)); // verify that the super type shows up twice in all traits @@ -167,8 +171,8 @@ public class EntityNotificationIT extends BaseResourceIT { atlasClientV1.deleteTrait(guid, traitName); - EntityNotification entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, - newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); + EntityNotificationV1 entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME, + newNotificationPredicate(EntityNotificationV1.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE_BUILTIN, guid)); assertFalse(entityNotification.getEntity().getTraitNames().contains(traitName)); } @@ -177,11 +181,15 @@ public class EntityNotificationIT extends BaseResourceIT { // ----- helper methods --------------------------------------------------- private void createTrait(String traitName, String ... superTraitNames) throws Exception { - TraitTypeDefinition trait = - TypesUtil.createTraitTypeDef(traitName, null, ImmutableSet.copyOf(superTraitNames)); + TraitTypeDefinition traitDef = TypesUtil.createTraitTypeDef(traitName, null, ImmutableSet.copyOf(superTraitNames)); + TypesDef typesDef = new TypesDef(Collections.<EnumTypeDefinition>emptyList(), + Collections.<StructTypeDefinition>emptyList(), + Collections.singletonList(traitDef), + Collections.<ClassTypeDefinition>emptyList()); + String traitDefinitionJSON = AtlasType.toV1Json(typesDef); - String traitDefinitionJSON = AtlasType.toV1Json(trait); LOG.debug("Trait definition = {}", traitDefinitionJSON); + createType(traitDefinitionJSON); } http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java index 1f045e4..f248593 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java @@ -20,14 +20,13 @@ package org.apache.atlas.notification; import org.apache.atlas.EntityAuditEvent; import org.apache.atlas.kafka.NotificationProvider; +import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.v1.model.instance.Id; import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.notification.HookNotification; -import org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage; -import org.apache.atlas.v1.model.notification.HookNotification.EntityDeleteRequest; -import org.apache.atlas.v1.model.notification.HookNotification.EntityPartialUpdateRequest; -import org.apache.atlas.v1.model.notification.HookNotification.EntityCreateRequest; -import org.apache.atlas.v1.model.notification.HookNotification.EntityUpdateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityDeleteRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityPartialUpdateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityUpdateRequest; import org.apache.atlas.web.integration.BaseResourceIT; import org.codehaus.jettison.json.JSONArray; import org.testng.annotations.AfterClass; @@ -40,18 +39,19 @@ import static java.lang.Thread.sleep; import static org.testng.Assert.assertEquals; public class NotificationHookConsumerIT extends BaseResourceIT { - private static final String TEST_USER = "testuser"; - public static final String NAME = "name"; - public static final String DESCRIPTION = "description"; + + public static final String NAME = "name"; + public static final String DESCRIPTION = "description"; public static final String QUALIFIED_NAME = "qualifiedName"; - public static final String CLUSTER_NAME = "clusterName"; + public static final String CLUSTER_NAME = "clusterName"; - private NotificationInterface notificationInterface = NotificationProvider.get(); + private final NotificationInterface notificationInterface = NotificationProvider.get(); @BeforeClass public void setUp() throws Exception { super.setUp(); + createTypeDefinitionsV1(); } @@ -60,29 +60,33 @@ public class NotificationHookConsumerIT extends BaseResourceIT { notificationInterface.close(); } - private void sendHookMessage(HookNotificationMessage message) throws NotificationException, InterruptedException { + private void sendHookMessage(HookNotification message) throws NotificationException, InterruptedException { notificationInterface.send(NotificationInterface.NotificationType.HOOK, message); + sleep(1000); } @Test public void testMessageHandleFailureConsumerContinues() throws Exception { //send invalid message - update with invalid type - sendHookMessage(new HookNotification.EntityPartialUpdateRequest(TEST_USER, randomString(), null, null, - new Referenceable(randomString()))); + sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, randomString(), null, null, new Referenceable(randomString()))); //send valid message final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - String dbName = "db" + randomString(); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); entity.set(CLUSTER_NAME, randomString()); + sendHookMessage(new EntityCreateRequest(TEST_USER, entity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE_BUILTIN, entity.get(NAME))); + return results.length() == 1; } }); @@ -91,24 +95,28 @@ public class NotificationHookConsumerIT extends BaseResourceIT { @Test public void testCreateEntity() throws Exception { final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - String dbName = "db" + randomString(); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); entity.set(CLUSTER_NAME, randomString()); sendHookMessage(new EntityCreateRequest(TEST_USER, entity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, entity.get(QUALIFIED_NAME))); + return results.length() == 1; } }); //Assert that user passed in hook message is used in audit - Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME)); - List<EntityAuditEvent> events = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1); + Referenceable instance = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, (String) entity.get(QUALIFIED_NAME)); + List<EntityAuditEvent> events = atlasClientV1.getEntityAuditEvents(instance.getId()._getId(), (short) 1); + assertEquals(events.size(), 1); assertEquals(events.get(0).getUser(), TEST_USER); } @@ -116,7 +124,8 @@ public class NotificationHookConsumerIT extends BaseResourceIT { @Test public void testUpdateEntityPartial() throws Exception { final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String dbName = "db" + randomString(); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); @@ -125,25 +134,31 @@ public class NotificationHookConsumerIT extends BaseResourceIT { atlasClientV1.createEntity(entity); final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN); + newEntity.set("owner", randomString()); + sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { Referenceable localEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName); + return (localEntity.get("owner") != null && localEntity.get("owner").equals(newEntity.get("owner"))); } }); //Its partial update and un-set fields are not updated Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName); + assertEquals(actualEntity.get(DESCRIPTION), entity.get(DESCRIPTION)); } @Test public void testUpdatePartialUpdatingQualifiedName() throws Exception { final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String dbName = "db" + randomString(); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); @@ -152,28 +167,32 @@ public class NotificationHookConsumerIT extends BaseResourceIT { atlasClientV1.createEntity(entity); final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String newName = "db" + randomString(); + final String newName = "db" + randomString(); + newEntity.set(QUALIFIED_NAME, newName); sendHookMessage(new EntityPartialUpdateRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName, newEntity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newName)); + return results.length() == 1; } }); //no entity with the old qualified name JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, dbName)); - assertEquals(results.length(), 0); + assertEquals(results.length(), 0); } @Test public void testDeleteByQualifiedName() throws Exception { - Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String dbName = "db" + randomString(); + final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); @@ -182,10 +201,12 @@ public class NotificationHookConsumerIT extends BaseResourceIT { final String dbId = atlasClientV1.createEntity(entity).get(0); sendHookMessage(new EntityDeleteRequest(TEST_USER, DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { Referenceable getEntity = atlasClientV1.getEntity(dbId); + return getEntity.getId().getState() == Id.EntityState.DELETED; } }); @@ -193,8 +214,9 @@ public class NotificationHookConsumerIT extends BaseResourceIT { @Test public void testUpdateEntityFullUpdate() throws Exception { - Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); - final String dbName = "db" + randomString(); + final Referenceable entity = new Referenceable(DATABASE_TYPE_BUILTIN); + final String dbName = "db" + randomString(); + entity.set(NAME, dbName); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, dbName); @@ -203,6 +225,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT { atlasClientV1.createEntity(entity); final Referenceable newEntity = new Referenceable(DATABASE_TYPE_BUILTIN); + newEntity.set(NAME, randomString()); newEntity.set(DESCRIPTION, randomString()); newEntity.set("owner", randomString()); @@ -211,18 +234,19 @@ public class NotificationHookConsumerIT extends BaseResourceIT { //updating unique attribute sendHookMessage(new EntityUpdateRequest(TEST_USER, newEntity)); + waitFor(MAX_WAIT_TIME, new Predicate() { @Override public boolean evaluate() throws Exception { JSONArray results = searchByDSL(String.format("%s where qualifiedName='%s'", DATABASE_TYPE_BUILTIN, newEntity.get(QUALIFIED_NAME))); + return results.length() == 1; } }); Referenceable actualEntity = atlasClientV1.getEntity(DATABASE_TYPE_BUILTIN, QUALIFIED_NAME, dbName); + assertEquals(actualEntity.get(DESCRIPTION), newEntity.get(DESCRIPTION)); assertEquals(actualEntity.get("owner"), newEntity.get("owner")); } - - } http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 68497e0..4ea13c7 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -25,9 +25,10 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.kafka.AtlasKafkaMessage; import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.kafka.NotificationProvider; -import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; +import org.apache.atlas.model.notification.HookNotification; import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.notification.HookNotification; +import org.apache.atlas.v1.model.notification.HookNotificationV1; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.EntityStream; @@ -41,7 +42,7 @@ import org.testng.Assert; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; -import static org.apache.atlas.v1.model.notification.HookNotification.HookNotificationMessage; + import java.util.List; import org.apache.atlas.kafka.AtlasKafkaConsumer; @@ -57,11 +58,11 @@ import static org.testng.Assert.*; public class NotificationHookConsumerKafkaTest { - - public static final String NAME = "name"; - public static final String DESCRIPTION = "description"; + public static final String NAME = "name"; + public static final String DESCRIPTION = "description"; public static final String QUALIFIED_NAME = "qualifiedName"; - private NotificationInterface notificationInterface = NotificationProvider.get(); + + private final NotificationInterface notificationInterface = NotificationProvider.get(); @Mock @@ -81,10 +82,14 @@ public class NotificationHookConsumerKafkaTest { @BeforeTest public void setup() throws AtlasException, InterruptedException, AtlasBaseException { MockitoAnnotations.initMocks(this); - AtlasType mockType = mock(AtlasType.class); + + AtlasType mockType = mock(AtlasType.class); + AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class); + when(typeRegistry.getType(anyString())).thenReturn(mockType); - AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); + when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity); + kafkaNotification = startKafkaServer(); } @@ -97,19 +102,20 @@ public class NotificationHookConsumerKafkaTest { @Test public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException, AtlasBaseException { try { - produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user1", createEntity())); - NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, false); - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, false); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); + verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); // produce another message, and make sure it moves ahead. If commit succeeded, this would work. - produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user2", createEntity())); consumeOneMessage(consumer, hookConsumer); + verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); reset(atlasEntityStore); } @@ -121,22 +127,20 @@ public class NotificationHookConsumerKafkaTest { @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled") public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception { try { - produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user3", createEntity())); - NotificationConsumer<HookNotificationMessage> consumer = createNewConsumer(kafkaNotification, true); + NotificationConsumer<HookNotification> consumer = createNewConsumer(kafkaNotification, true); assertNotNull (consumer); - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); - + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); consumeOneMessage(consumer, hookConsumer); verify(atlasEntityStore).createOrUpdate(any(EntityStream.class), anyBoolean()); // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. - produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity())); + produceMessage(new HookNotificationV1.EntityCreateRequest("test_user4", createEntity())); consumeOneMessage(consumer, hookConsumer); verify(atlasEntityStore,times(2)).createOrUpdate(any(EntityStream.class), anyBoolean()); @@ -146,18 +150,19 @@ public class NotificationHookConsumerKafkaTest { } } - AtlasKafkaConsumer<HookNotificationMessage> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { + AtlasKafkaConsumer<HookNotification> createNewConsumer(KafkaNotification kafkaNotification, boolean autoCommitEnabled) { return (AtlasKafkaConsumer) kafkaNotification.createConsumers(NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0); } - void consumeOneMessage(NotificationConsumer<HookNotificationMessage> consumer, + void consumeOneMessage(NotificationConsumer<HookNotification> consumer, NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException { try { long startTime = System.currentTimeMillis(); //fetch starting time + while ((System.currentTimeMillis() - startTime) < 10000) { - List<AtlasKafkaMessage<HookNotificationMessage>> messages = consumer.receive(); + List<AtlasKafkaMessage<HookNotification>> messages = consumer.receive(); - for (AtlasKafkaMessage<HookNotificationMessage> msg : messages) { + for (AtlasKafkaMessage<HookNotification> msg : messages) { hookConsumer.handleMessage(msg); } @@ -172,19 +177,25 @@ public class NotificationHookConsumerKafkaTest { Referenceable createEntity() { final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE); + entity.set(NAME, "db" + randomString()); entity.set(DESCRIPTION, randomString()); entity.set(QUALIFIED_NAME, randomString()); + return entity; } KafkaNotification startKafkaServer() throws AtlasException, InterruptedException { Configuration applicationProperties = ApplicationProperties.get(); + applicationProperties.setProperty("atlas.kafka.data", "target/" + RandomStringUtils.randomAlphanumeric(5)); kafkaNotification = new KafkaNotification(applicationProperties); + kafkaNotification.start(); + Thread.sleep(2000); + return kafkaNotification; } @@ -192,8 +203,7 @@ public class NotificationHookConsumerKafkaTest { return RandomStringUtils.randomAlphanumeric(10); } - private void produceMessage(HookNotificationMessage message) throws NotificationException { + private void produceMessage(HookNotification message) throws NotificationException { kafkaNotification.send(NotificationInterface.NotificationType.HOOK, message); } - } http://git-wip-us.apache.org/repos/asf/atlas/blob/f01e46d7/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index 2d3d5ba..f8bd9a1 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -22,10 +22,12 @@ import org.apache.atlas.AtlasServiceException; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.kafka.AtlasKafkaMessage; -import org.apache.atlas.model.instance.AtlasEntity; +import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo; import org.apache.atlas.model.instance.EntityMutationResponse; +import org.apache.atlas.model.notification.HookNotification.HookNotificationType; +import org.apache.atlas.notification.NotificationInterface.NotificationType; import org.apache.atlas.v1.model.instance.Referenceable; -import org.apache.atlas.v1.model.notification.HookNotification; +import org.apache.atlas.v1.model.notification.HookNotificationV1.EntityCreateRequest; import org.apache.atlas.repository.converters.AtlasInstanceConverter; import org.apache.atlas.repository.store.graph.AtlasEntityStore; import org.apache.atlas.repository.store.graph.v1.EntityStream; @@ -43,6 +45,7 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; @@ -75,20 +78,24 @@ public class NotificationHookConsumerTest { @BeforeMethod public void setup() throws AtlasBaseException { MockitoAnnotations.initMocks(this); - AtlasType mockType = mock(AtlasType.class); + + AtlasType mockType = mock(AtlasType.class); + AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntitiesWithExtInfo.class); + when(typeRegistry.getType(anyString())).thenReturn(mockType); - AtlasEntity.AtlasEntitiesWithExtInfo mockEntity = mock(AtlasEntity.AtlasEntitiesWithExtInfo.class); when(instanceConverter.toAtlasEntities(anyList())).thenReturn(mockEntity); + EntityMutationResponse mutationResponse = mock(EntityMutationResponse.class); + when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenReturn(mutationResponse); } @Test public void testConsumerCanProceedIfServerIsReady() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); - NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); assertTrue(hookConsumer.serverAvailable(timer)); @@ -98,10 +105,9 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); - NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); when(serviceState.getState()) .thenReturn(ServiceState.ServiceStateValue.PASSIVE) @@ -116,35 +122,30 @@ public class NotificationHookConsumerTest { @Test public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException, AtlasException { - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationConsumer consumer = mock(NotificationConsumer.class); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(consumer); - HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationConsumer consumer = mock(NotificationConsumer.class); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + EntityCreateRequest message = mock(EntityCreateRequest.class); + Referenceable mock = mock(Referenceable.class); + when(message.getUser()).thenReturn("user"); - when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE); - Referenceable mock = mock(Referenceable.class); + when(message.getType()).thenReturn(HookNotificationType.ENTITY_CREATE); when(message.getEntities()).thenReturn(Arrays.asList(mock)); hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); + verify(consumer).commit(any(TopicPartition.class), anyInt()); } @Test public void testCommitIsNotCalledEvenWhenMessageProcessingFails() throws AtlasServiceException, AtlasException, AtlasBaseException { - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationConsumer consumer = mock(NotificationConsumer.class); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(consumer); - HookNotification.EntityCreateRequest message = new HookNotification.EntityCreateRequest("user", - new ArrayList<Referenceable>() { - { - add(mock(Referenceable.class)); - } - }); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationConsumer consumer = mock(NotificationConsumer.class); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(consumer); + EntityCreateRequest message = new EntityCreateRequest("user", Collections.singletonList(mock(Referenceable.class))); + when(atlasEntityStore.createOrUpdate(any(EntityStream.class), anyBoolean())).thenThrow(new RuntimeException("Simulating exception in processing message")); + hookConsumer.handleMessage(new AtlasKafkaMessage(message, -1, -1)); verifyZeroInteractions(consumer); @@ -152,10 +153,10 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws Exception { - NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); - NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + NotificationHookConsumer.HookConsumer hookConsumer = notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)); + NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); + doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS); when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.PASSIVE); @@ -164,58 +165,75 @@ public class NotificationHookConsumerTest { @Test public void testConsumersStartedIfHAIsDisabled() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(false); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - consumers.add(mock(NotificationConsumer.class)); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). - thenReturn(consumers); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + notificationHookConsumer.startInternal(configuration, executorService); - verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); + + verify(notificationInterface).createConsumers(NotificationType.HOOK, 1); verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); } @Test public void testConsumersAreNotStartedIfHAIsEnabled() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - consumers.add(mock(NotificationConsumer.class)); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). - thenReturn(consumers); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + notificationHookConsumer.startInternal(configuration, executorService); + verifyZeroInteractions(notificationInterface); } @Test public void testConsumersAreStartedWhenInstanceBecomesActive() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(configuration.containsKey(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - consumers.add(mock(NotificationConsumer.class)); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)). - thenReturn(consumers); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); + NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); + notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsActive(); - verify(notificationInterface).createConsumers(NotificationInterface.NotificationType.HOOK, 1); + + verify(notificationInterface).createConsumers(NotificationType.HOOK, 1); verify(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); } @Test public void testConsumersAreStoppedWhenInstanceBecomesPassive() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); - consumers.add(notificationConsumerMock); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers); final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); doAnswer(new Answer() { @@ -223,12 +241,14 @@ public class NotificationHookConsumerTest { public Object answer(InvocationOnMock invocationOnMock) throws Throwable { notificationHookConsumer.consumers.get(0).start(); Thread.sleep(500); + return null; } }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); + verify(notificationInterface).close(); verify(executorService).shutdown(); verify(notificationConsumerMock).wakeup(); @@ -236,18 +256,21 @@ public class NotificationHookConsumerTest { @Test public void consumersStoppedBeforeStarting() throws Exception { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); - consumers.add(notificationConsumerMock); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers); final NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); notificationHookConsumer.startInternal(configuration, executorService); notificationHookConsumer.instanceIsPassive(); + verify(notificationInterface).close(); verify(executorService).shutdown(); } @@ -261,13 +284,16 @@ public class NotificationHookConsumerTest { public Object answer(InvocationOnMock invocationOnMock) throws Throwable { notificationHookConsumer.consumers.get(0).start(); Thread.sleep(1000); + return null; } }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); notificationHookConsumer.startInternal(configuration, executorService); Thread.sleep(1000); + assertTrue(notificationHookConsumer.consumers.get(0).isAlive()); + notificationHookConsumer.consumers.get(0).shutdown(); } @@ -280,27 +306,32 @@ public class NotificationHookConsumerTest { public Object answer(InvocationOnMock invocationOnMock) throws Throwable { notificationHookConsumer.consumers.get(0).start(); Thread.sleep(500); + return null; } }).when(executorService).submit(any(NotificationHookConsumer.HookConsumer.class)); notificationHookConsumer.startInternal(configuration, executorService); Thread.sleep(500); + notificationHookConsumer.consumers.get(0).shutdown(); Thread.sleep(500); + assertFalse(notificationHookConsumer.consumers.get(0).isAlive()); } private NotificationHookConsumer setupNotificationHookConsumer() throws AtlasException { + List<NotificationConsumer<Object>> consumers = new ArrayList(); + NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); + + consumers.add(notificationConsumerMock); + when(serviceState.getState()).thenReturn(ServiceState.ServiceStateValue.ACTIVE); when(configuration.getBoolean(HAConfiguration.ATLAS_SERVER_HA_ENABLED_KEY, false)).thenReturn(true); when(configuration.getInt(NotificationHookConsumer.CONSUMER_THREADS_PROPERTY, 1)).thenReturn(1); - List<NotificationConsumer<Object>> consumers = new ArrayList(); - NotificationConsumer notificationConsumerMock = mock(NotificationConsumer.class); when(notificationConsumerMock.receive()).thenThrow(new IllegalStateException()); - consumers.add(notificationConsumerMock); + when(notificationInterface.createConsumers(NotificationType.HOOK, 1)).thenReturn(consumers); - when(notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, 1)).thenReturn(consumers); return new NotificationHookConsumer(notificationInterface, atlasEntityStore, serviceState, instanceConverter, typeRegistry); } }