ATLAS-577 Integrate entity audit with DefaultMetadataService (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/bca454e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/bca454e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/bca454e1 Branch: refs/heads/master Commit: bca454e16f0b289b39ab75986e6acdca49488d04 Parents: 985465f Author: Shwetha GS <[email protected]> Authored: Thu Mar 31 14:49:12 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Thu Mar 31 14:49:12 2016 +0530 ---------------------------------------------------------------------- addons/falcon-bridge/pom.xml | 9 +- .../apache/atlas/falcon/hook/FalconHook.java | 97 ++-------- .../atlas/publisher/FalconEventPublisher.java | 6 +- .../apache/atlas/falcon/hook/FalconHookIT.java | 56 +++++- addons/hive-bridge/pom.xml | 9 +- .../org/apache/atlas/hive/hook/HiveHook.java | 33 ++-- addons/sqoop-bridge/pom.xml | 9 +- .../org/apache/atlas/sqoop/hook/SqoopHook.java | 73 +------ .../apache/atlas/sqoop/hook/SqoopHookIT.java | 30 ++- addons/storm-bridge/pom.xml | 9 +- .../apache/atlas/storm/hook/StormAtlasHook.java | 53 +----- .../atlas/storm/hook/StormAtlasHookIT.java | 39 +++- .../atlas/storm/hook/StormAtlasHookTest.java | 68 ------- .../org/apache/atlas/ApplicationProperties.java | 12 +- distro/src/conf/atlas-log4j.xml | 2 +- .../java/org/apache/atlas/hook/AtlasHook.java | 85 ++++++--- .../notification/NotificationHookConsumer.java | 23 +-- .../atlas/notification/NotificationModule.java | 7 + .../NotificationEntityChangeListener.java | 2 + .../notification/hook/HookNotification.java | 88 +++++---- .../NotificationHookConsumerTest.java | 37 +++- .../notification/hook/HookNotificationTest.java | 69 ++++--- pom.xml | 5 +- release-log.txt | 1 + repository/pom.xml | 1 + .../apache/atlas/RepositoryMetadataModule.java | 28 ++- .../repository/audit/EntityAuditListener.java | 95 ++++++++++ .../repository/audit/EntityAuditRepository.java | 37 +++- .../audit/HBaseBasedAuditRepository.java | 25 ++- .../audit/InMemoryEntityAuditRepository.java | 59 ++++++ .../graph/GraphBackedSearchIndexer.java | 112 ++++++----- .../atlas/services/DefaultMetadataService.java | 61 +++--- .../GraphBackedDiscoveryServiceTest.java | 7 - .../audit/AuditRepositoryTestBase.java | 81 ++++++++ .../audit/HBaseBasedAuditRepositoryTest.java | 88 +-------- .../atlas/repository/audit/HBaseTestUtils.java | 57 ++++++ .../audit/InMemoryAuditRepositoryTest.java | 28 +++ .../service/DefaultMetadataServiceTest.java | 57 +++++- .../DefaultMetadataServiceMockTest.java | 7 +- server-api/pom.xml | 1 - .../java/org/apache/atlas/RequestContext.java | 55 ++++++ .../atlas/typesystem/types/TypeSystem.java | 2 - .../typesystem/types/TypeSystemProvider.java | 28 +++ .../main/resources/atlas-application.properties | 8 + webapp/pom.xml | 4 +- .../web/filters/AtlasAuthenticationFilter.java | 79 +++++++- .../apache/atlas/web/filters/AuditFilter.java | 10 +- .../atlas/web/listeners/GuiceServletConfig.java | 33 ++-- .../atlas/web/service/EmbeddedServer.java | 7 +- .../NotificationHookConsumerIT.java | 12 +- .../AtlasAuthenticationKerberosFilterIT.java | 190 +++++++++++++++++++ .../AtlasAuthenticationSimpleFilterIT.java | 98 ++++++++++ .../MetadataAuthenticationKerberosFilterIT.java | 179 ----------------- .../MetadataAuthenticationSimpleFilterIT.java | 94 --------- .../web/listeners/TestGuiceServletConfig.java | 6 + .../apache/atlas/web/listeners/TestModule.java | 32 ++++ .../web/security/BaseSSLAndKerberosTest.java | 8 +- .../atlas/web/security/BaseSecurityTest.java | 25 ++- .../security/NegativeSSLAndKerberosTest.java | 3 +- .../org/apache/atlas/web/security/SSLTest.java | 23 ++- .../web/service/SecureEmbeddedServerTest.java | 26 ++- .../service/SecureEmbeddedServerTestBase.java | 15 +- 62 files changed, 1558 insertions(+), 945 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml index ccdb512..ad345c5 100644 --- a/addons/falcon-bridge/pom.xml +++ b/addons/falcon-bridge/pom.xml @@ -205,13 +205,18 @@ <daemon>true</daemon> <webApp> <contextPath>/</contextPath> - <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath> </webApp> <useTestScope>true</useTestScope> <systemProperties> <systemProperty> <name>log4j.configuration</name> - <value>atlas-log4j.xml</value> + <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value> + </systemProperty> + <systemProperty> + <name>atlas.log.file</name> + <value>application.log</value> </systemProperty> <systemProperty> <name>atlas.log.dir</name> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index d4b0069..c1ab384 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -20,22 +20,17 @@ package org.apache.atlas.falcon.hook; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; -import com.google.inject.Inject; import com.google.inject.Injector; -import com.sun.jersey.api.client.ClientResponse; -import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasServiceException; import org.apache.atlas.falcon.model.FalconDataModelGenerator; import org.apache.atlas.falcon.model.FalconDataTypes; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; -import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.falcon.atlas.Util.EventUtil; import org.apache.falcon.atlas.event.FalconEvent; @@ -50,8 +45,7 @@ import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.Input; import org.apache.falcon.entity.v0.process.Output; import org.apache.falcon.entity.v0.process.Process; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.security.UserGroupInformation; +import org.apache.falcon.security.CurrentUser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,7 +59,7 @@ import java.util.concurrent.TimeUnit; /** * Falcon hook sends lineage information to the Atlas Service. */ -public class FalconHook extends FalconEventPublisher { +public class FalconHook extends AtlasHook implements FalconEventPublisher { private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class); public static final String CONF_PREFIX = "atlas.hook.falcon."; @@ -77,10 +71,6 @@ public class FalconHook extends FalconEventPublisher { public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; - public static final String ATLAS_ENDPOINT = "atlas.rest.address"; - - private static AtlasClient atlasClient; - // wait time determines how long we wait before we exit the jvm on // shutdown. Pending requests after that will not be sent. private static final int WAIT_TIME = 3; @@ -91,20 +81,12 @@ public class FalconHook extends FalconEventPublisher { private static final long keepAliveTimeDefault = 10; private static final int queueSizeDefault = 10000; - private static Configuration atlasProperties; - @Inject - private static NotificationInterface notifInterface; - - public static boolean typesRegistered = false; - private static boolean sync; private static ConfigurationStore STORE; static { try { - atlasProperties = ApplicationProperties.get(); - // initialize the async facility to process hook calls. We don't // want to do this inline since it adds plenty of overhead for the query. int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault); @@ -130,8 +112,6 @@ public class FalconHook extends FalconEventPublisher { // shutdown client } }); - atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT), - EventUtil.getUgi(), EventUtil.getUgi().getShortUserName()); STORE = ConfigurationStore.get(); } catch (Exception e) { @@ -166,7 +146,17 @@ public class FalconHook extends FalconEventPublisher { private void fireAndForget(FalconEvent event) throws Exception { LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation()); - notifyEntity(createEntities(event)); + notifyEntities(getAuthenticatedUser(), createEntities(event)); + } + + private String getAuthenticatedUser() { + String user = null; + try { + user = CurrentUser.getAuthenticatedUser(); + } catch (IllegalArgumentException e) { + LOG.warn("Failed to get user from CurrentUser.getAuthenticatedUser"); + } + return getUser(user, null); } private List<Referenceable> createEntities(FalconEvent event) throws Exception { @@ -179,36 +169,6 @@ public class FalconHook extends FalconEventPublisher { } /** - * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities. - * De-duping of entities is done on server side depending on the unique attribute on the - * - * @param entities entitiies to add - */ - private void notifyEntity(List<Referenceable> entities) { - int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3); - String message = entities.toString(); - - int numRetries = 0; - while (true) { - try { - notifInterface.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest(entities)); - return; - } catch (Exception e) { - numRetries++; - if (numRetries < maxRetries) { - LOG.debug("Failed to notify atlas for entity {}. Retrying", message, e); - } else { - LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", message, - maxRetries, e); - break; - } - } - } - } - - - /** + * Creates process entity + * + * @param event process entity event @@ -324,32 +284,9 @@ public class FalconHook extends FalconEventPublisher { return entities; } - public synchronized void registerFalconDataModel() throws Exception { - if (isDataModelAlreadyRegistered()) { - LOG.info("Falcon data model is already registered!"); - return; - } - - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties, - UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser()); - hiveMetaStoreBridge.registerHiveDataModel(); - - FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator(); - LOG.info("Registering Falcon data model"); - atlasClient.createType(dataModelGenerator.getModelAsJson()); - } - - private boolean isDataModelAlreadyRegistered() throws Exception { - try { - atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); - LOG.info("Hive data model is already registered!"); - return true; - } catch(AtlasServiceException ase) { - if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { - return false; - } - throw ase; - } + @Override + protected String getNumberOfRetriesPropertyKey() { + return HOOK_NUM_RETRIES; } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java index 3522339..8029be9 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java +++ b/addons/falcon-bridge/src/main/java/org/apache/falcon/atlas/publisher/FalconEventPublisher.java @@ -24,8 +24,8 @@ import org.apache.falcon.atlas.event.FalconEvent; /** * Falcon publisher for Atlas */ -public abstract class FalconEventPublisher { - public static class Data { +public interface FalconEventPublisher { + class Data { private FalconEvent event; public Data(FalconEvent event) { @@ -37,5 +37,5 @@ public abstract class FalconEventPublisher { } } - public abstract void publish(final Data data) throws Exception; + void publish(final Data data) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java ---------------------------------------------------------------------- diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java index aaffa4a..4249a8f 100644 --- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java +++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java @@ -18,12 +18,16 @@ package org.apache.atlas.falcon.hook; +import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.falcon.model.FalconDataModelGenerator; import org.apache.atlas.falcon.model.FalconDataTypes; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.persistence.Id; +import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; import org.apache.falcon.atlas.service.AtlasService; import org.apache.falcon.entity.store.ConfigurationStore; @@ -33,6 +37,8 @@ import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.security.CurrentUser; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; @@ -54,21 +60,51 @@ public class FalconHookIT { public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml"; public static final String PROCESS_RESOURCE = "/process.xml"; - private AtlasClient dgiCLient; + private AtlasClient atlasClient; private static final ConfigurationStore STORE = ConfigurationStore.get(); + private Configuration atlasProperties; @BeforeClass public void setUp() throws Exception { - dgiCLient = new AtlasClient(ApplicationProperties.get().getString("atlas.rest.address")); + atlasProperties = ApplicationProperties.get(); + atlasClient = new AtlasClient(atlasProperties.getString("atlas.rest.address")); AtlasService service = new AtlasService(); service.init(); STORE.registerListener(service); - new FalconHook().registerFalconDataModel(); + registerFalconDataModel(); CurrentUser.authenticate(System.getProperty("user.name")); } + private void registerFalconDataModel() throws Exception { + if (isDataModelAlreadyRegistered()) { + LOG.info("Falcon data model is already registered!"); + return; + } + + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasProperties, + UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser()); + hiveMetaStoreBridge.registerHiveDataModel(); + + FalconDataModelGenerator dataModelGenerator = new FalconDataModelGenerator(); + LOG.info("Registering Falcon data model"); + atlasClient.createType(dataModelGenerator.getModelAsJson()); + } + + private boolean isDataModelAlreadyRegistered() throws Exception { + try { + atlasClient.getType(FalconDataTypes.FALCON_PROCESS_ENTITY.getName()); + LOG.info("Hive data model is already registered!"); + return true; + } catch(AtlasServiceException ase) { + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + return false; + } + throw ase; + } + } + private <T extends Entity> T loadEntity(EntityType type, String resource, String name) throws JAXBException { Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource)); switch (entity.getEntityType()) { @@ -115,17 +151,17 @@ public class FalconHookIT { STORE.publish(EntityType.PROCESS, process); String pid = assertProcessIsRegistered(cluster.getName(), process.getName()); - Referenceable processEntity = dgiCLient.getEntity(pid); + Referenceable processEntity = atlasClient.getEntity(pid); assertNotNull(processEntity); assertEquals(processEntity.get("processName"), process.getName()); Id inId = (Id) ((List)processEntity.get("inputs")).get(0); - Referenceable inEntity = dgiCLient.getEntity(inId._getId()); + Referenceable inEntity = atlasClient.getEntity(inId._getId()); assertEquals(inEntity.get("name"), HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), inDbName, inTableName)); Id outId = (Id) ((List)processEntity.get("outputs")).get(0); - Referenceable outEntity = dgiCLient.getEntity(outId._getId()); + Referenceable outEntity = atlasClient.getEntity(outId._getId()); assertEquals(outEntity.get("name"), HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName)); } @@ -173,12 +209,12 @@ public class FalconHookIT { STORE.publish(EntityType.PROCESS, process); String pid = assertProcessIsRegistered(cluster.getName(), process.getName()); - Referenceable processEntity = dgiCLient.getEntity(pid); + Referenceable processEntity = atlasClient.getEntity(pid); assertEquals(processEntity.get("processName"), process.getName()); assertNull(processEntity.get("inputs")); Id outId = (Id) ((List)processEntity.get("outputs")).get(0); - Referenceable outEntity = dgiCLient.getEntity(outId._getId()); + Referenceable outEntity = atlasClient.getEntity(outId._getId()); assertEquals(outEntity.get("name"), HiveMetaStoreBridge.getTableQualifiedName(cluster.getName(), outDbName, outTableName)); } @@ -209,13 +245,13 @@ public class FalconHookIT { waitFor(2000000, new Predicate() { @Override public boolean evaluate() throws Exception { - JSONArray results = dgiCLient.search(query); + JSONArray results = atlasClient.search(query); System.out.println(results); return results.length() == 1; } }); - JSONArray results = dgiCLient.search(query); + JSONArray results = atlasClient.search(query); JSONObject row = results.getJSONObject(0).getJSONObject("t"); return row.getString("id"); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/hive-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/pom.xml b/addons/hive-bridge/pom.xml index adb4f3a..8bfbb13 100755 --- a/addons/hive-bridge/pom.xml +++ b/addons/hive-bridge/pom.xml @@ -283,13 +283,18 @@ <daemon>true</daemon> <webApp> <contextPath>/</contextPath> - <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath> </webApp> <useTestScope>true</useTestScope> <systemProperties> <systemProperty> <name>log4j.configuration</name> - <value>atlas-log4j.xml</value> + <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value> + </systemProperty> + <systemProperty> + <name>atlas.log.file</name> + <value>application.log</value> </systemProperty> <systemProperty> <name>atlas.log.dir</name> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 16ed452..f313f2e 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -20,14 +20,12 @@ package org.apache.atlas.hive.hook; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.atlas.ApplicationProperties; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; @@ -86,8 +84,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final long keepAliveTimeDefault = 10; private static final int queueSizeDefault = 10000; - private static Configuration atlasProperties; - class HiveEvent { public Set<ReadEntity> inputs; public Set<WriteEntity> outputs; @@ -108,8 +104,6 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { static { try { - atlasProperties = ApplicationProperties.get(); - // initialize the async facility to process hook calls. We don't // want to do this inline since it adds plenty of overhead for the query. int minThreads = atlasProperties.getInt(MIN_THREADS, minThreadsDefault); @@ -166,7 +160,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { event.inputs = hookContext.getInputs(); event.outputs = hookContext.getOutputs(); - event.user = hookContext.getUserName() == null ? hookContext.getUgi().getUserName() : hookContext.getUserName(); + event.user = getUser(hookContext.getUserName(), hookContext.getUgi()); event.ugi = hookContext.getUgi(); event.operation = OPERATION_MAP.get(hookContext.getOperationName()); event.hookType = hookContext.getHookType(); @@ -258,7 +252,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { for (WriteEntity writeEntity : event.outputs) { if (writeEntity.getType() == Type.DATABASE) { //Create/update table entity - createOrUpdateEntities(dgiBridge, writeEntity); + createOrUpdateEntities(dgiBridge, event.user, writeEntity); } } } @@ -271,7 +265,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { //Below check should filter out partition related if (writeEntity.getType() == Entity.Type.TABLE) { //Create/update table entity - createOrUpdateEntities(dgiBridge, writeEntity); + createOrUpdateEntities(dgiBridge, event.user, writeEntity); } } } @@ -292,7 +286,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { .equals(oldTable.getTableName())) { //Create/update old table entity - create new entity and replace id - Referenceable tableEntity = createOrUpdateEntities(dgiBridge, writeEntity); + Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.user, writeEntity); String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), oldTable.getDbName(), oldTable.getTableName()); tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName); @@ -304,14 +298,15 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName); newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase()); - messages.add(new HookNotification.EntityPartialUpdateRequest(HiveDataTypes.HIVE_TABLE.getName(), - HiveDataModelGenerator.NAME, oldQualifiedName, newEntity)); + messages.add(new HookNotification.EntityPartialUpdateRequest(event.user, + HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME, + oldQualifiedName, newEntity)); } } } } - private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, Entity entity) throws Exception { + private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity) throws Exception { Database db = null; Table table = null; Partition partition = null; @@ -351,14 +346,14 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { entities.add(partitionEntity); } - messages.add(new HookNotification.EntityUpdateRequest(entities)); + messages.add(new HookNotification.EntityUpdateRequest(user, entities)); return tableEntity; } private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception { for (WriteEntity entity : event.outputs) { if (entity.getType() == entityType) { - createOrUpdateEntities(dgiBridge, entity); + createOrUpdateEntities(dgiBridge, event.user, entity); } } } @@ -396,7 +391,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { List<Referenceable> source = new ArrayList<>(); for (ReadEntity readEntity : inputs) { if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) { - Referenceable inTable = createOrUpdateEntities(dgiBridge, readEntity); + Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity); source.add(inTable); } } @@ -405,7 +400,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { List<Referenceable> target = new ArrayList<>(); for (WriteEntity writeEntity : outputs) { if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) { - Referenceable outTable = createOrUpdateEntities(dgiBridge, writeEntity); + Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity); target.add(outTable); } } @@ -417,7 +412,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { //TODO set processReferenceable.set("queryGraph", "queryGraph"); - messages.add(new HookNotification.EntityCreateRequest(processReferenceable)); + messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable)); } @@ -432,6 +427,4 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return new JSONObject(); } } - - } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/sqoop-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/pom.xml b/addons/sqoop-bridge/pom.xml index 0927c8f..343bb4e 100644 --- a/addons/sqoop-bridge/pom.xml +++ b/addons/sqoop-bridge/pom.xml @@ -288,13 +288,18 @@ <daemon>true</daemon> <webApp> <contextPath>/</contextPath> - <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath> </webApp> <useTestScope>true</useTestScope> <systemProperties> <systemProperty> <name>log4j.configuration</name> - <value>atlas-log4j.xml</value> + <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value> + </systemProperty> + <systemProperty> + <name>atlas.log.file</name> + <value>application.log</value> </systemProperty> <systemProperty> <name>atlas.log.dir</name> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java index b573ac4..924e467 100644 --- a/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java +++ b/addons/sqoop-bridge/src/main/java/org/apache/atlas/sqoop/hook/SqoopHook.java @@ -19,31 +19,24 @@ package org.apache.atlas.sqoop.hook; -import com.google.inject.Guice; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasServiceException; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; -import org.apache.atlas.notification.NotificationInterface; -import org.apache.atlas.notification.NotificationModule; +import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.sqoop.model.SqoopDataModelGenerator; import org.apache.atlas.sqoop.model.SqoopDataTypes; import org.apache.atlas.typesystem.Referenceable; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.sqoop.SqoopJobDataPublisher; import org.apache.sqoop.util.ImportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -55,43 +48,16 @@ import java.util.Properties; public class SqoopHook extends SqoopJobDataPublisher { private static final Logger LOG = LoggerFactory.getLogger(SqoopHook.class); - private static final String DEFAULT_DGI_URL = "http://localhost:21000/"; public static final String CONF_PREFIX = "atlas.hook.sqoop."; public static final String HOOK_NUM_RETRIES = CONF_PREFIX + "numRetries"; public static final String ATLAS_CLUSTER_NAME = "atlas.cluster.name"; public static final String DEFAULT_CLUSTER_NAME = "primary"; - public static final String ATLAS_REST_ADDRESS = "atlas.rest.address"; - - @Inject - private static NotificationInterface notifInterface; static { org.apache.hadoop.conf.Configuration.addDefaultResource("sqoop-site.xml"); } - synchronized void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception { - // Make sure hive model exists - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf, - UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser()); - hiveMetaStoreBridge.registerHiveDataModel(); - SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator(); - - //Register sqoop data model if its not already registered - try { - client.getType(SqoopDataTypes.SQOOP_PROCESS.getName()); - LOG.info("Sqoop data model is already registered!"); - } catch(AtlasServiceException ase) { - if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { - //Expected in case types do not exist - LOG.info("Registering Sqoop data model"); - client.createType(dataModelGenerator.getModelAsJson()); - } else { - throw ase; - } - } - } - public Referenceable createHiveDatabaseInstance(String clusterName, String dbName) throws Exception { Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); @@ -182,12 +148,7 @@ public class SqoopHook extends SqoopJobDataPublisher { @Override public void publish(SqoopJobDataPublisher.Data data) throws Exception { - Injector injector = Guice.createInjector(new NotificationModule()); - notifInterface = injector.getInstance(NotificationInterface.class); - Configuration atlasProperties = ApplicationProperties.get(); - AtlasClient atlasClient = new AtlasClient(atlasProperties.getString(ATLAS_REST_ADDRESS, DEFAULT_DGI_URL), - UserGroupInformation.getCurrentUser(), UserGroupInformation.getCurrentUser().getShortUserName()); org.apache.hadoop.conf.Configuration sqoopConf = new org.apache.hadoop.conf.Configuration(); String clusterName = sqoopConf.get(ATLAS_CLUSTER_NAME, DEFAULT_CLUSTER_NAME); @@ -197,33 +158,9 @@ public class SqoopHook extends SqoopJobDataPublisher { data.getHiveTable(), data.getHiveDB()); Referenceable procRef = createSqoopProcessInstance(dbStoreRef, hiveTableRef, data, clusterName); - notifyEntity(atlasProperties, dbStoreRef, dbRef, hiveTableRef, procRef); - } - - /** - * Notify atlas of the entity through message. The entity can be a complex entity with reference to other entities. - * De-duping of entities is done on server side depending on the unique attribute on the - * @param entities - Entity references to publish. - */ - private void notifyEntity(Configuration atlasProperties, Referenceable... entities) { int maxRetries = atlasProperties.getInt(HOOK_NUM_RETRIES, 3); - - int numRetries = 0; - while (true) { - try { - notifInterface.send(NotificationInterface.NotificationType.HOOK, - new HookNotification.EntityCreateRequest(entities)); - return; - } catch(Exception e) { - numRetries++; - if(numRetries < maxRetries) { - LOG.debug("Failed to notify atlas for entity {}. Retrying", entities, e); - } else { - LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting", entities, - maxRetries, e); - break; - } - } - } + HookNotification.HookNotificationMessage message = + new HookNotification.EntityCreateRequest(AtlasHook.getUser(), dbStoreRef, dbRef, hiveTableRef, procRef); + AtlasHook.notifyEntities(Arrays.asList(message), maxRetries); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java ---------------------------------------------------------------------- diff --git a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java index 94cd105..0e4658a 100644 --- a/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java +++ b/addons/sqoop-bridge/src/test/java/org/apache/atlas/sqoop/hook/SqoopHookIT.java @@ -18,11 +18,17 @@ package org.apache.atlas.sqoop.hook; +import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.sqoop.model.SqoopDataModelGenerator; import org.apache.atlas.sqoop.model.SqoopDataTypes; import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.sqoop.SqoopJobDataPublisher; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONObject; @@ -44,7 +50,29 @@ public class SqoopHookIT { //Set-up sqoop session Configuration configuration = ApplicationProperties.get(); dgiCLient = new AtlasClient(configuration.getString("atlas.rest.address")); - new SqoopHook().registerDataModels(dgiCLient, configuration); + registerDataModels(dgiCLient, configuration); + } + + private void registerDataModels(AtlasClient client, Configuration atlasConf) throws Exception { + // Make sure hive model exists + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasConf, + UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser()); + hiveMetaStoreBridge.registerHiveDataModel(); + SqoopDataModelGenerator dataModelGenerator = new SqoopDataModelGenerator(); + + //Register sqoop data model if its not already registered + try { + client.getType(SqoopDataTypes.SQOOP_PROCESS.getName()); + LOG.info("Sqoop data model is already registered!"); + } catch(AtlasServiceException ase) { + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + //Expected in case types do not exist + LOG.info("Registering Sqoop data model"); + client.createType(dataModelGenerator.getModelAsJson()); + } else { + throw ase; + } + } } @Test http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/pom.xml ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/pom.xml b/addons/storm-bridge/pom.xml index 76c4507..e3b4ed7 100644 --- a/addons/storm-bridge/pom.xml +++ b/addons/storm-bridge/pom.xml @@ -318,13 +318,18 @@ <daemon>true</daemon> <webApp> <contextPath>/</contextPath> - <descriptor>../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + <descriptor>${project.basedir}/../../webapp/src/test/webapp/WEB-INF/web.xml</descriptor> + <extraClasspath>${project.basedir}/../../webapp/target/test-classes/</extraClasspath> </webApp> <useTestScope>true</useTestScope> <systemProperties> <systemProperty> <name>log4j.configuration</name> - <value>atlas-log4j.xml</value> + <value>file://${project.basedir}/../../distro/src/conf/atlas-log4j.xml</value> + </systemProperty> + <systemProperty> + <name>atlas.log.file</name> + <value>application.log</value> </systemProperty> <systemProperty> <name>atlas.log.dir</name> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 4c0004b..620f929 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -24,20 +24,13 @@ import backtype.storm.generated.SpoutSpec; import backtype.storm.generated.StormTopology; import backtype.storm.generated.TopologyInfo; import backtype.storm.utils.Utils; -import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConstants; -import org.apache.atlas.AtlasException; -import org.apache.atlas.AtlasServiceException; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; -import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.hook.AtlasHook; -import org.apache.atlas.storm.model.StormDataModel; import org.apache.atlas.storm.model.StormDataTypes; import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.TypesDef; -import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -70,15 +63,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { public static final String HBASE_NAMESPACE_DEFAULT = "default"; - private static volatile boolean typesRegistered = false; - - public StormAtlasHook() { - super(); - } - - StormAtlasHook(AtlasClient atlasClient) { - super(atlasClient); - } @Override protected String getNumberOfRetriesPropertyKey() { return HOOK_NUM_RETRIES; @@ -113,7 +97,8 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { entities.add(topologyReferenceable); LOG.debug("notifying entities, size = {}", entities.size()); - notifyEntities(entities); + String user = getUser(topologyInfo.get_owner(), null); + notifyEntities(user, entities); } catch (Exception e) { throw new RuntimeException("Atlas hook is unable to process the topology.", e); } @@ -379,38 +364,6 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { return String.format("%s.%s@%s", nameSpace, tableName, clusterName); } - public synchronized void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException, - AtlasServiceException { - - try { - atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName()); - LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model"); - } catch(AtlasServiceException ase) { - if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { - //Expected in case types do not exist - LOG.info("Registering Hive data model"); - atlasClient.createType(dataModelGenerator.getModelAsJson()); - } else { - throw ase; - } - } - - - try { - atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName()); - } catch(AtlasServiceException ase) { - if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { - LOG.info("Registering Storm/Kafka data model"); - StormDataModel.main(new String[]{}); - TypesDef typesDef = StormDataModel.typesDef(); - String stormTypesAsJSON = TypesSerialization.toJson(typesDef); - LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON); - atlasClient.createType(stormTypesAsJSON); - } - } - typesRegistered = true; - } - private String getClusterName(Map stormConf) { String clusterName = AtlasConstants.DEFAULT_CLUSTER_NAME; if (stormConf.containsKey(AtlasConstants.CLUSTER_NAME_KEY)) { @@ -418,6 +371,4 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { } return clusterName; } - - } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java index 79f1b07..4648d24 100644 --- a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java +++ b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookIT.java @@ -20,9 +20,13 @@ package org.apache.atlas.storm.hook; import backtype.storm.ILocalCluster; import backtype.storm.generated.StormTopology; +import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasException; +import org.apache.atlas.AtlasServiceException; import org.apache.atlas.hive.model.HiveDataModelGenerator; +import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.storm.model.StormDataModel; import org.apache.atlas.storm.model.StormDataTypes; import org.apache.atlas.typesystem.Referenceable; @@ -57,9 +61,40 @@ public class StormAtlasHookIT { Configuration configuration = ApplicationProperties.get(); atlasClient = new AtlasClient(configuration.getString("atlas.rest.address", ATLAS_URL)); - new StormAtlasHook().registerDataModel(new HiveDataModelGenerator()); + registerDataModel(new HiveDataModelGenerator()); } + private void registerDataModel(HiveDataModelGenerator dataModelGenerator) throws AtlasException, + AtlasServiceException { + try { + atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName()); + LOG.info("Hive data model is already registered! Going ahead with registration of Storm Data model"); + } catch(AtlasServiceException ase) { + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + //Expected in case types do not exist + LOG.info("Registering Hive data model"); + atlasClient.createType(dataModelGenerator.getModelAsJson()); + } else { + throw ase; + } + } + + + try { + atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName()); + } catch(AtlasServiceException ase) { + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + LOG.info("Registering Storm/Kafka data model"); + StormDataModel.main(new String[]{}); + TypesDef typesDef = StormDataModel.typesDef(); + String stormTypesAsJSON = TypesSerialization.toJson(typesDef); + LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON); + atlasClient.createType(stormTypesAsJSON); + } + } + } + + @AfterClass public void tearDown() throws Exception { LOG.info("Shutting down storm local cluster"); @@ -76,7 +111,7 @@ public class StormAtlasHookIT { String stormTypesAsJSON = TypesSerialization.toJson(stormTypesDef); LOG.info("stormTypesAsJSON = {}", stormTypesAsJSON); - new StormAtlasHook().registerDataModel(new HiveDataModelGenerator()); + registerDataModel(new HiveDataModelGenerator()); // verify types are registered for (StormDataTypes stormDataType : StormDataTypes.values()) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java b/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java deleted file mode 100644 index 51840a5..0000000 --- a/addons/storm-bridge/src/test/java/org/apache/atlas/storm/hook/StormAtlasHookTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.atlas.storm.hook; - -import com.sun.jersey.api.client.ClientResponse; -import org.apache.atlas.AtlasClient; -import org.apache.atlas.AtlasException; -import org.apache.atlas.AtlasServiceException; -import org.apache.atlas.hive.model.HiveDataModelGenerator; -import org.apache.atlas.hive.model.HiveDataTypes; -import org.apache.atlas.storm.model.StormDataTypes; -import org.testng.annotations.Test; - -import static org.mockito.Matchers.contains; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -@Test -public class StormAtlasHookTest { - - @Test - public void testStormRegistersHiveDataModelIfNotPresent() throws AtlasException, AtlasServiceException { - AtlasClient atlasClient = mock(AtlasClient.class); - HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class); - AtlasServiceException atlasServiceException = mock(AtlasServiceException.class); - when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND); - when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenThrow(atlasServiceException); - String hiveModel = "{hive_model_as_json}"; - when(dataModelGenerator.getModelAsJson()).thenReturn(hiveModel); - - StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient); - stormAtlasHook.registerDataModel(dataModelGenerator); - - verify(atlasClient).createType(hiveModel); - } - - @Test - public void testStormRegistersStormModelIfNotPresent() throws AtlasServiceException, AtlasException { - AtlasClient atlasClient = mock(AtlasClient.class); - HiveDataModelGenerator dataModelGenerator = mock(HiveDataModelGenerator.class); - when(atlasClient.getType(HiveDataTypes.HIVE_PROCESS.getName())).thenReturn("hive_process_definition"); - AtlasServiceException atlasServiceException = mock(AtlasServiceException.class); - when(atlasServiceException.getStatus()).thenReturn(ClientResponse.Status.NOT_FOUND); - when(atlasClient.getType(StormDataTypes.STORM_TOPOLOGY.getName())).thenThrow(atlasServiceException); - - StormAtlasHook stormAtlasHook = new StormAtlasHook(atlasClient); - stormAtlasHook.registerDataModel(dataModelGenerator); - - verify(atlasClient).createType(contains("storm_topology")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/common/src/main/java/org/apache/atlas/ApplicationProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/ApplicationProperties.java b/common/src/main/java/org/apache/atlas/ApplicationProperties.java index d74a30e..ca72ffd 100644 --- a/common/src/main/java/org/apache/atlas/ApplicationProperties.java +++ b/common/src/main/java/org/apache/atlas/ApplicationProperties.java @@ -35,12 +35,22 @@ public final class ApplicationProperties extends PropertiesConfiguration { public static final String APPLICATION_PROPERTIES = "atlas-application.properties"; - private static Configuration instance = null; + private static volatile Configuration instance = null; private ApplicationProperties(URL url) throws ConfigurationException { super(url); } + public static void forceReload() { + if (instance != null) { + synchronized (ApplicationProperties.class) { + if (instance != null) { + instance = null; + } + } + } + } + public static Configuration get() throws AtlasException { if (instance == null) { synchronized (ApplicationProperties.class) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/distro/src/conf/atlas-log4j.xml ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-log4j.xml b/distro/src/conf/atlas-log4j.xml index 6071703..1ac4082 100755 --- a/distro/src/conf/atlas-log4j.xml +++ b/distro/src/conf/atlas-log4j.xml @@ -55,7 +55,7 @@ <appender-ref ref="FILE"/> </logger> - <logger name="AUDIT"> + <logger name="AUDIT" additivity="false"> <level value="info"/> <appender-ref ref="AUDIT"/> </logger> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java index 2e41c5c..7e09a19 100644 --- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java +++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java @@ -19,20 +19,21 @@ package org.apache.atlas.hook; import com.google.inject.Guice; -import com.google.inject.Inject; import com.google.inject.Injector; import org.apache.atlas.ApplicationProperties; -import org.apache.atlas.AtlasClient; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.notification.NotificationModule; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.security.UserGroupInformation; import org.codehaus.jettison.json.JSONArray; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -44,25 +45,19 @@ import java.util.List; public abstract class AtlasHook { private static final Logger LOG = LoggerFactory.getLogger(AtlasHook.class); - private static final String DEFAULT_ATLAS_URL = "http://localhost:21000/"; - - public static final String ATLAS_ENDPOINT = "atlas.rest.address"; - - protected final AtlasClient atlasClient; /** * Hadoop Cluster name for this instance, typically used for namespace. */ protected static Configuration atlasProperties; - @Inject protected static NotificationInterface notifInterface; static { try { atlasProperties = ApplicationProperties.get(); } catch (Exception e) { - LOG.info("Attempting to send msg while shutdown in progress.", e); + LOG.info("Failed to load application properties", e); } Injector injector = Guice.createInjector(new NotificationModule()); @@ -71,18 +66,9 @@ public abstract class AtlasHook { LOG.info("Created Atlas Hook"); } - public AtlasHook() { - this(new AtlasClient(atlasProperties.getString(ATLAS_ENDPOINT, DEFAULT_ATLAS_URL))); - } - - public AtlasHook(AtlasClient atlasClient) { - this.atlasClient = atlasClient; - //TODO - take care of passing in - ugi, doAsUser for secure cluster - } - protected abstract String getNumberOfRetriesPropertyKey(); - protected void notifyEntities(Collection<Referenceable> entities) { + protected void notifyEntities(String user, Collection<Referenceable> entities) { JSONArray entitiesArray = new JSONArray(); for (Referenceable entity : entities) { @@ -92,27 +78,26 @@ public abstract class AtlasHook { } List<HookNotification.HookNotificationMessage> hookNotificationMessages = new ArrayList<>(); - hookNotificationMessages.add(new HookNotification.EntityCreateRequest(entitiesArray)); + hookNotificationMessages.add(new HookNotification.EntityCreateRequest(user, entitiesArray)); notifyEntities(hookNotificationMessages); } /** - * Notify atlas - * of the entity through message. The entity can be a + * Notify atlas of the entity through message. The entity can be a * complex entity with reference to other entities. * De-duping of entities is done on server side depending on the * unique attribute on the entities. * - * @param entities entities + * @param messages hook notification messages + * @param maxRetries maximum number of retries while sending message to messaging system */ - protected void notifyEntities(List<HookNotification.HookNotificationMessage> entities) { - final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3); - final String message = entities.toString(); + public static void notifyEntities(List<HookNotification.HookNotificationMessage> messages, int maxRetries) { + final String message = messages.toString(); int numRetries = 0; while (true) { try { - notifInterface.send(NotificationInterface.NotificationType.HOOK, entities); + notifInterface.send(NotificationInterface.NotificationType.HOOK, messages); return; } catch(Exception e) { numRetries++; @@ -125,4 +110,50 @@ public abstract class AtlasHook { } } } + + /** + * Notify atlas of the entity through message. The entity can be a + * complex entity with reference to other entities. + * De-duping of entities is done on server side depending on the + * unique attribute on the entities. + * + * @param messages hook notification messages + */ + protected void notifyEntities(List<HookNotification.HookNotificationMessage> messages) { + final int maxRetries = atlasProperties.getInt(getNumberOfRetriesPropertyKey(), 3); + notifyEntities(messages, maxRetries); + } + + /** + * Returns the logged in user. + * @return + */ + public static String getUser() { + return getUser(null, null); + } + + /** + * Returns the user. Order of preference: + * 1. Given userName + * 2. ugi.getShortUserName() + * 3. UserGroupInformation.getCurrentUser().getShortUserName() + * 4. System.getProperty("user.name") + */ + + public static String getUser(String userName, UserGroupInformation ugi) { + if (StringUtils.isNotEmpty(userName)) { + return userName; + } + + if (ugi != null && StringUtils.isNotEmpty(ugi.getShortUserName())) { + return ugi.getShortUserName(); + } + + try { + return UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException e) { + LOG.warn("Failed for UserGroupInformation.getCurrentUser()"); + return System.getProperty("user.name"); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java index 015af44..2fcbcd3 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java @@ -26,6 +26,7 @@ import org.apache.atlas.AtlasException; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.service.Service; import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,14 +49,13 @@ public class NotificationHookConsumer implements Service { @Inject private NotificationInterface notificationInterface; private ExecutorService executors; - private AtlasClient atlasClient; + private String atlasEndpoint; @Override public void start() throws AtlasException { Configuration applicationProperties = ApplicationProperties.get(); - String atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); - atlasClient = new AtlasClient(atlasEndpoint); + atlasEndpoint = applicationProperties.getString(ATLAS_ENDPOINT_PROPERTY, "http://localhost:21000"); int numThreads = applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1); List<NotificationConsumer<HookNotification.HookNotificationMessage>> consumers = notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, numThreads); @@ -87,15 +87,8 @@ public class NotificationHookConsumer implements Service { class HookConsumer implements Runnable { private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer; - private final AtlasClient client; public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> consumer) { - this(atlasClient, consumer); - } - - public HookConsumer(AtlasClient client, - NotificationConsumer<HookNotification.HookNotificationMessage> consumer) { - this.client = client; this.consumer = consumer; } @@ -118,6 +111,9 @@ public class NotificationHookConsumer implements Service { try { if (hasNext()) { HookNotification.HookNotificationMessage message = consumer.next(); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(message.getUser()); + AtlasClient atlasClient = getAtlasClient(ugi); + try { switch (message.getType()) { case ENTITY_CREATE: @@ -154,9 +150,14 @@ public class NotificationHookConsumer implements Service { } } + protected AtlasClient getAtlasClient(UserGroupInformation ugi) { + return new AtlasClient(atlasEndpoint, ugi, ugi.getShortUserName()); + } + boolean serverAvailable(Timer timer) { try { - while (!client.isServerReady()) { + AtlasClient atlasClient = getAtlasClient(UserGroupInformation.getCurrentUser()); + while (!atlasClient.isServerReady()) { try { LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", SERVER_READY_WAIT_TIME_MS); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java index c20fdf1..e8ae177 100644 --- a/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java +++ b/notification/src/main/java/org/apache/atlas/notification/NotificationModule.java @@ -22,6 +22,8 @@ import com.google.inject.Singleton; import com.google.inject.multibindings.Multibinder; import org.apache.atlas.kafka.KafkaNotification; import org.apache.atlas.kafka.KafkaNotificationProvider; +import org.apache.atlas.listener.EntityChangeListener; +import org.apache.atlas.notification.entity.NotificationEntityChangeListener; import org.apache.atlas.service.Service; /** @@ -37,5 +39,10 @@ public class NotificationModule extends AbstractModule { Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder(), Service.class); serviceBinder.addBinding().to(KafkaNotification.class); serviceBinder.addBinding().to(NotificationHookConsumer.class); + + //Add NotificationEntityChangeListener as EntityChangeListener + Multibinder<EntityChangeListener> entityChangeListenerBinder = + Multibinder.newSetBinder(binder(), EntityChangeListener.class); + entityChangeListenerBinder.addBinding().to(NotificationEntityChangeListener.class); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java index 31f5c2b..300cbb5 100644 --- a/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java +++ b/notification/src/main/java/org/apache/atlas/notification/entity/NotificationEntityChangeListener.java @@ -17,6 +17,7 @@ */ package org.apache.atlas.notification.entity; +import com.google.inject.Inject; import org.apache.atlas.AtlasException; import org.apache.atlas.listener.EntityChangeListener; import org.apache.atlas.notification.NotificationInterface; @@ -48,6 +49,7 @@ public class NotificationEntityChangeListener implements EntityChangeListener { * @param notificationInterface the notification framework interface * @param typeSystem the Atlas type system */ + @Inject public NotificationEntityChangeListener(NotificationInterface notificationInterface, TypeSystem typeSystem) { this.notificationInterface = notificationInterface; this.typeSystem = typeSystem; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java ---------------------------------------------------------------------- diff --git a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java index a000161..4c7f6de 100644 --- a/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java +++ b/notification/src/main/java/org/apache/atlas/notification/hook/HookNotification.java @@ -25,6 +25,7 @@ import com.google.gson.JsonParseException; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.TypesDef; import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.apache.commons.lang.StringUtils; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; @@ -41,29 +42,24 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN @Override public HookNotificationMessage deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) { - if (json.isJsonArray()) { - JSONArray jsonArray = context.deserialize(json, JSONArray.class); - return new EntityCreateRequest(jsonArray); - } else { - HookNotificationType type = - context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class); - switch (type) { - case ENTITY_CREATE: - return context.deserialize(json, EntityCreateRequest.class); - - case ENTITY_FULL_UPDATE: - return context.deserialize(json, EntityUpdateRequest.class); - - case ENTITY_PARTIAL_UPDATE: - return context.deserialize(json, EntityPartialUpdateRequest.class); - - case TYPE_CREATE: - case TYPE_UPDATE: - return context.deserialize(json, TypeRequest.class); - - default: - throw new IllegalStateException("Unhandled type " + type); - } + HookNotificationType type = + context.deserialize(((JsonObject) json).get("type"), HookNotificationType.class); + switch (type) { + case ENTITY_CREATE: + return context.deserialize(json, EntityCreateRequest.class); + + case ENTITY_FULL_UPDATE: + return context.deserialize(json, EntityUpdateRequest.class); + + case ENTITY_PARTIAL_UPDATE: + return context.deserialize(json, EntityPartialUpdateRequest.class); + + case TYPE_CREATE: + case TYPE_UPDATE: + return context.deserialize(json, TypeRequest.class); + + default: + throw new IllegalStateException("Unhandled type " + type); } } @@ -78,18 +74,30 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN * Base type of hook message. */ public static class HookNotificationMessage { + public static final String UNKNOW_USER = "UNKNOWN"; protected HookNotificationType type; + protected String user; private HookNotificationMessage() { } - public HookNotificationMessage(HookNotificationType type) { + public HookNotificationMessage(HookNotificationType type, String user) { this.type = type; + this.user = user; } public HookNotificationType getType() { return type; } + + public String getUser() { + if (StringUtils.isEmpty(user)) { + return UNKNOW_USER; + } + return user; + } + + } /** @@ -101,8 +109,8 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN private TypeRequest() { } - public TypeRequest(HookNotificationType type, TypesDef typesDef) { - super(type); + public TypeRequest(HookNotificationType type, TypesDef typesDef, String user) { + super(type, user); this.typesDef = typesDef; } @@ -120,21 +128,21 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN private EntityCreateRequest() { } - public EntityCreateRequest(Referenceable... entities) { - this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities)); + public EntityCreateRequest(String user, Referenceable... entities) { + this(HookNotificationType.ENTITY_CREATE, Arrays.asList(entities), user); } - public EntityCreateRequest(List<Referenceable> entities) { - this(HookNotificationType.ENTITY_CREATE, entities); + public EntityCreateRequest(String user, List<Referenceable> entities) { + this(HookNotificationType.ENTITY_CREATE, entities, user); } - protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities) { - super(type); + protected EntityCreateRequest(HookNotificationType type, List<Referenceable> entities, String user) { + super(type, user); this.entities = entities; } - public EntityCreateRequest(JSONArray jsonArray) { - super(HookNotificationType.ENTITY_CREATE); + public EntityCreateRequest(String user, JSONArray jsonArray) { + super(HookNotificationType.ENTITY_CREATE, user); entities = new ArrayList<>(); for (int index = 0; index < jsonArray.length(); index++) { try { @@ -154,12 +162,12 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN * Hook message for updating entities(full update). */ public static class EntityUpdateRequest extends EntityCreateRequest { - public EntityUpdateRequest(Referenceable... entities) { - this(Arrays.asList(entities)); + public EntityUpdateRequest(String user, Referenceable... entities) { + this(user, Arrays.asList(entities)); } - public EntityUpdateRequest(List<Referenceable> entities) { - super(HookNotificationType.ENTITY_FULL_UPDATE, entities); + public EntityUpdateRequest(String user, List<Referenceable> entities) { + super(HookNotificationType.ENTITY_FULL_UPDATE, entities, user); } } @@ -175,9 +183,9 @@ public class HookNotification implements JsonDeserializer<HookNotification.HookN private EntityPartialUpdateRequest() { } - public EntityPartialUpdateRequest(String typeName, String attribute, String attributeValue, + public EntityPartialUpdateRequest(String user, String typeName, String attribute, String attributeValue, Referenceable entity) { - super(HookNotificationType.ENTITY_PARTIAL_UPDATE); + super(HookNotificationType.ENTITY_PARTIAL_UPDATE, user); this.typeName = typeName; this.attribute = attribute; this.attributeValue = attributeValue; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java index b3d4721..02255a7 100644 --- a/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java @@ -19,6 +19,7 @@ package org.apache.atlas.notification; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasServiceException; +import org.apache.hadoop.security.UserGroupInformation; import org.testng.annotations.Test; import static org.mockito.Mockito.*; @@ -29,10 +30,15 @@ public class NotificationHookConsumerTest { @Test public void testConsumerCanProceedIfServerIsReady() throws InterruptedException, AtlasServiceException { - AtlasClient atlasClient = mock(AtlasClient.class); + final AtlasClient atlasClient = mock(AtlasClient.class); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class)); + notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { + @Override + protected AtlasClient getAtlasClient(UserGroupInformation ugi) { + return atlasClient; + } + }; NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); when(atlasClient.isServerReady()).thenReturn(true); @@ -43,10 +49,15 @@ public class NotificationHookConsumerTest { @Test public void testConsumerWaitsNTimesIfServerIsNotReadyNTimes() throws AtlasServiceException, InterruptedException { - AtlasClient atlasClient = mock(AtlasClient.class); + final AtlasClient atlasClient = mock(AtlasClient.class); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class)); + notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { + @Override + protected AtlasClient getAtlasClient(UserGroupInformation ugi) { + return atlasClient; + } + }; NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); when(atlasClient.isServerReady()).thenReturn(false, false, false, true); @@ -57,10 +68,15 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException, InterruptedException { - AtlasClient atlasClient = mock(AtlasClient.class); + final AtlasClient atlasClient = mock(AtlasClient.class); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class)); + notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { + @Override + protected AtlasClient getAtlasClient(UserGroupInformation ugi) { + return atlasClient; + } + }; NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); doThrow(new InterruptedException()).when(timer).sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS); when(atlasClient.isServerReady()).thenReturn(false); @@ -70,10 +86,15 @@ public class NotificationHookConsumerTest { @Test public void testConsumerProceedsWithFalseOnAtlasServiceException() throws AtlasServiceException { - AtlasClient atlasClient = mock(AtlasClient.class); + final AtlasClient atlasClient = mock(AtlasClient.class); NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(); NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(atlasClient, mock(NotificationConsumer.class)); + notificationHookConsumer.new HookConsumer(mock(NotificationConsumer.class)) { + @Override + protected AtlasClient getAtlasClient(UserGroupInformation ugi) { + return atlasClient; + } + }; NotificationHookConsumer.Timer timer = mock(NotificationHookConsumer.Timer.class); when(atlasClient.isServerReady()).thenThrow(new AtlasServiceException(AtlasClient.API.VERSION, new Exception())); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/bca454e1/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java ---------------------------------------------------------------------- diff --git a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java index 1dedb5b..11b7a53 100644 --- a/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java +++ b/notification/src/test/java/org/apache/atlas/notification/hook/HookNotificationTest.java @@ -19,49 +19,74 @@ package org.apache.atlas.notification.hook; import org.apache.atlas.notification.AbstractNotificationConsumer; import org.apache.atlas.typesystem.Referenceable; -import org.apache.atlas.typesystem.json.InstanceSerialization; -import org.codehaus.jettison.json.JSONArray; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; public class HookNotificationTest { - - @Test - public void testMessageBackwardCompatibility() throws Exception { - JSONArray jsonArray = new JSONArray(); - Referenceable entity = new Referenceable("sometype"); - entity.set("name", "somename"); - String entityJson = InstanceSerialization.toJson(entity, true); - jsonArray.put(entityJson); - - HookNotification.HookNotificationMessage notification = AbstractNotificationConsumer.GSON.fromJson( - jsonArray.toString(), HookNotification.HookNotificationMessage.class); - assertNotNull(notification); - assertEquals(notification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE); - HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) notification; - assertEquals(createRequest.getEntities().size(), 1); - assertEquals(createRequest.getEntities().get(0).getTypeName(), entity.getTypeName()); - } - @Test public void testNewMessageSerDe() throws Exception { Referenceable entity1 = new Referenceable("sometype"); entity1.set("attr", "value"); entity1.set("complex", new Referenceable("othertype")); Referenceable entity2 = new Referenceable("newtype"); - HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(entity1, entity2); + String user = "user"; + HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(user, entity1, entity2); String notificationJson = AbstractNotificationConsumer.GSON.toJson(request); HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson( notificationJson, HookNotification.HookNotificationMessage.class); assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE); + assertEquals(actualNotification.getUser(), user); + HookNotification.EntityCreateRequest createRequest = (HookNotification.EntityCreateRequest) actualNotification; assertEquals(createRequest.getEntities().size(), 2); + Referenceable actualEntity1 = createRequest.getEntities().get(0); assertEquals(actualEntity1.getTypeName(), "sometype"); assertEquals(((Referenceable)actualEntity1.get("complex")).getTypeName(), "othertype"); assertEquals(createRequest.getEntities().get(1).getTypeName(), "newtype"); } + + @Test + public void testBackwardCompatibility() throws Exception { + /** + Referenceable entity = new Referenceable("sometype"); + entity.set("attr", "value"); + String user = "user"; + HookNotification.EntityCreateRequest request = new HookNotification.EntityCreateRequest(null, entity); + + String notificationJson = AbstractNotificationConsumer.GSON.toJson(request); + System.out.println(notificationJson); + **/ + + //Json without user and assert that the string can be deserialised + String notificationJson = "{\n" + + " \"entities\": [\n" + + " {\n" + + " \"jsonClass\": \"org.apache.atlas.typesystem.json.InstanceSerialization$_Reference\",\n" + + " \"id\": {\n" + + " \"jsonClass\": \"org.apache.atlas.typesystem.json.InstanceSerialization$_Id\",\n" + + " \"id\": \"-1457685864305243000\",\n" + + " \"version\": 0,\n" + + " \"typeName\": \"sometype\"\n" + + " },\n" + + " \"typeName\": \"sometype\",\n" + + " \"values\": {\n" + + " \"attr\": \"value\"\n" + + " },\n" + + " \"traitNames\": [],\n" + + " \"traits\": {}\n" + + " }\n" + + " ],\n" + + " \"type\": \"ENTITY_CREATE\"\n" + + "}"; + + HookNotification.HookNotificationMessage actualNotification = AbstractNotificationConsumer.GSON.fromJson( + notificationJson, HookNotification.HookNotificationMessage.class); + assertEquals(actualNotification.getType(), HookNotification.HookNotificationType.ENTITY_CREATE); + assertNull(actualNotification.user); + assertEquals(actualNotification.getUser(), HookNotification.HookNotificationMessage.UNKNOW_USER); + } }
