Repository: incubator-atlas Updated Branches: refs/heads/master 009330de2 -> 46365f8c4
ATLAS-527 Support lineage for load table, import, export (sumasai via shwethags) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/46365f8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/46365f8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/46365f8c Branch: refs/heads/master Commit: 46365f8c484b06fe4f2ef681d0f8533c698820ff Parents: 009330d Author: Suma Shivaprasad <[email protected]> Authored: Thu Apr 7 09:00:47 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Thu Apr 7 09:00:47 2016 -0700 ---------------------------------------------------------------------- .../atlas/fs/model/FSDataModelGenerator.java | 3 + .../org/apache/atlas/fs/model/FSDataModel.scala | 5 +- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 58 ++- .../org/apache/atlas/hive/hook/HiveHook.java | 400 ++++++++++++------- .../org/apache/atlas/hive/hook/HiveHookIT.java | 253 ++++++++++-- .../apache/atlas/storm/hook/StormAtlasHook.java | 6 +- .../main/java/org/apache/atlas/AtlasClient.java | 3 + release-log.txt | 1 + .../atlas/services/ReservedTypesRegistrar.java | 12 + 9 files changed, 540 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java ---------------------------------------------------------------------- diff --git a/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java b/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java index 555d565..444c1aa 100644 --- a/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java +++ b/addons/hdfs-model/src/main/java/org/apache/atlas/fs/model/FSDataModelGenerator.java @@ -17,6 +17,9 @@ */ package org.apache.atlas.fs.model; +import com.sun.jersey.api.client.ClientResponse; +import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasServiceException; import org.apache.atlas.addons.ModelDefinitionDump; import org.apache.atlas.typesystem.TypesDef; import org.apache.atlas.typesystem.json.TypesSerialization; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala ---------------------------------------------------------------------- diff --git a/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala b/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala index c964f73..c380a92 100644 --- a/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala +++ b/addons/hdfs-model/src/main/scala/org/apache/atlas/fs/model/FSDataModel.scala @@ -31,13 +31,10 @@ import scala.tools.scalap.scalax.rules.scalasig.ClassFileParser.EnumConstValue */ object FSDataModel extends App { - var typesDef : TypesDef = null - val typesBuilder = new TypesBuilder - import typesBuilder._ - typesDef = types { + val typesDef : TypesDef = types { // FS DataSet _class(FSDataTypes.FS_PATH.toString, List("DataSet", AtlasClient.REFERENCEABLE_SUPER_TYPE)) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 3a802d7..eb5f1e6 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -18,18 +18,22 @@ package org.apache.atlas.hive.bridge; -import com.google.common.base.Joiner; +import com.google.common.annotations.VisibleForTesting; import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConstants; import org.apache.atlas.AtlasServiceException; +import org.apache.atlas.fs.model.FSDataModel; +import org.apache.atlas.fs.model.FSDataTypes; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -67,6 +71,9 @@ public class HiveMetaStoreBridge { public static final String ATLAS_ENDPOINT = "atlas.rest.address"; + private final String doAsUser; + private final UserGroupInformation ugi; + private static final Logger LOG = LoggerFactory.getLogger(HiveMetaStoreBridge.class); public final Hive hiveClient; @@ -82,6 +89,11 @@ public class HiveMetaStoreBridge { this(hiveConf, atlasConf, null, null); } + @VisibleForTesting + HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) { + this(clusterName, hiveClient, atlasClient, null, null); + } + public String getClusterName() { return clusterName; } @@ -96,21 +108,16 @@ public class HiveMetaStoreBridge { UserGroupInformation ugi) throws Exception { this(hiveConf.get(HIVE_CLUSTER_NAME, DEFAULT_CLUSTER_NAME), Hive.get(hiveConf), - atlasConf, doAsUser, ugi); - } - - HiveMetaStoreBridge(String clusterName, Hive hiveClient, - Configuration atlasConf, String doAsUser, UserGroupInformation ugi) { - this.clusterName = clusterName; - this.hiveClient = hiveClient; - String baseUrls = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL); - this.atlasClient = new AtlasClient(ugi, doAsUser, baseUrls.split(",")); + new AtlasClient(atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL), ugi, doAsUser), doAsUser, ugi); } - HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient) { + @VisibleForTesting + HiveMetaStoreBridge(String clusterName, Hive hiveClient, AtlasClient atlasClient, String user, UserGroupInformation ugi) { this.clusterName = clusterName; this.hiveClient = hiveClient; this.atlasClient = atlasClient; + this.doAsUser = user; + this.ugi = ugi; } private AtlasClient getAtlasClient() { @@ -306,7 +313,7 @@ public class HiveMetaStoreBridge { } private Referenceable createOrUpdateTableInstance(Referenceable dbReference, Referenceable tableReference, - Table hiveTable) throws Exception { + final Table hiveTable) throws Exception { LOG.info("Importing objects from {}.{}", hiveTable.getDbName(), hiveTable.getTableName()); if (tableReference == null) { @@ -348,6 +355,7 @@ public class HiveMetaStoreBridge { tableReference.set(TABLE_TYPE_ATTR, hiveTable.getTableType().name()); tableReference.set("temporary", hiveTable.isTemporary()); + return tableReference; } @@ -453,6 +461,17 @@ public class HiveMetaStoreBridge { return sdReferenceable; } + public Referenceable fillHDFSDataSet(String pathUri) { + Referenceable ref = new Referenceable(FSDataTypes.HDFS_PATH().toString()); + ref.set("path", pathUri); +// Path path = new Path(pathUri); +// ref.set("name", path.getName()); +// TODO - Fix after ATLAS-542 to shorter Name + ref.set("name", pathUri); + ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri); + return ref; + } + public static String getColumnQualifiedName(final String tableQualifiedName, final String colName) { final String[] parts = tableQualifiedName.split("@"); final String tableName = parts[0]; @@ -488,6 +507,21 @@ public class HiveMetaStoreBridge { AtlasClient dgiClient = getAtlasClient(); try { + dgiClient.getType(FSDataTypes.HDFS_PATH().toString()); + LOG.info("HDFS data model is already registered!"); + } catch(AtlasServiceException ase) { + if (ase.getStatus() == ClientResponse.Status.NOT_FOUND) { + //Trigger val definition + FSDataModel.main(null); + + final String hdfsModelJson = TypesSerialization.toJson(FSDataModel.typesDef()); + //Expected in case types do not exist + LOG.info("Registering HDFS data model : " + hdfsModelJson); + dgiClient.createType(hdfsModelJson); + } + } + + try { dgiClient.getType(HiveDataTypes.HIVE_PROCESS.getName()); LOG.info("Hive data model is already registered!"); } catch(AtlasServiceException ase) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 68e32ff..4102263 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -27,7 +27,10 @@ import org.apache.atlas.hook.AtlasHook; import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.ExplainTask; @@ -38,18 +41,20 @@ import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.HiveOperation; + import org.apache.hadoop.security.UserGroupInformation; import org.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.MalformedURLException; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -86,18 +91,108 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { private static final long keepAliveTimeDefault = 10; private static final int queueSizeDefault = 10000; - class HiveEvent { - public Set<ReadEntity> inputs; - public Set<WriteEntity> outputs; - - public String user; - public UserGroupInformation ugi; - public HiveOperation operation; - public HookContext.HookType hookType; - public JSONObject jsonPlan; - public String queryId; - public String queryStr; - public Long queryStartTime; + static class HiveEventContext { + private Set<ReadEntity> inputs; + private Set<WriteEntity> outputs; + + private String user; + private UserGroupInformation ugi; + private HiveOperation operation; + private HookContext.HookType hookType; + private org.json.JSONObject jsonPlan; + private String queryId; + private String queryStr; + private Long queryStartTime; + + private String queryType; + + public void setInputs(Set<ReadEntity> inputs) { + this.inputs = inputs; + } + + public void setOutputs(Set<WriteEntity> outputs) { + this.outputs = outputs; + } + + public void setUser(String user) { + this.user = user; + } + + public void setUgi(UserGroupInformation ugi) { + this.ugi = ugi; + } + + public void setOperation(HiveOperation operation) { + this.operation = operation; + } + + public void setHookType(HookContext.HookType hookType) { + this.hookType = hookType; + } + + public void setJsonPlan(JSONObject jsonPlan) { + this.jsonPlan = jsonPlan; + } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public void setQueryStr(String queryStr) { + this.queryStr = queryStr; + } + + public void setQueryStartTime(Long queryStartTime) { + this.queryStartTime = queryStartTime; + } + + public void setQueryType(String queryType) { + this.queryType = queryType; + } + + public Set<ReadEntity> getInputs() { + return inputs; + } + + public Set<WriteEntity> getOutputs() { + return outputs; + } + + public String getUser() { + return user; + } + + public UserGroupInformation getUgi() { + return ugi; + } + + public HiveOperation getOperation() { + return operation; + } + + public HookContext.HookType getHookType() { + return hookType; + } + + public org.json.JSONObject getJsonPlan() { + return jsonPlan; + } + + public String getQueryId() { + return queryId; + } + + public String getQueryStr() { + return queryStr; + } + + public Long getQueryStartTime() { + return queryStartTime; + } + + public String getQueryType() { + return queryType; + } } private List<HookNotification.HookNotificationMessage> messages = new ArrayList<>(); @@ -114,22 +209,22 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { int queueSize = atlasProperties.getInt(QUEUE_SIZE, queueSizeDefault); executor = new ThreadPoolExecutor(minThreads, maxThreads, keepAliveTime, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>(queueSize), - new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build()); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - executor.shutdown(); - executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); - executor = null; - } catch (InterruptedException ie) { - LOG.info("Interrupt received in shutdown."); - } - // shutdown client + new LinkedBlockingQueue<Runnable>(queueSize), + new ThreadFactoryBuilder().setNameFormat("Atlas Logger %d").build()); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + executor.shutdown(); + executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS); + executor = null; + } catch (InterruptedException ie) { + LOG.info("Interrupt received in shutdown."); } - }); + // shutdown client + } + }); setupOperationMap(); } catch (Exception e) { @@ -156,21 +251,21 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { @Override public void run(final HookContext hookContext) throws Exception { // clone to avoid concurrent access - final HiveEvent event = new HiveEvent(); - final HiveConf conf = new HiveConf(hookContext.getConf()); - - event.inputs = hookContext.getInputs(); - event.outputs = hookContext.getOutputs(); - event.user = getUser(hookContext.getUserName(), hookContext.getUgi()); - event.ugi = hookContext.getUgi(); - event.operation = OPERATION_MAP.get(hookContext.getOperationName()); - event.hookType = hookContext.getHookType(); - event.queryId = hookContext.getQueryPlan().getQueryId(); - event.queryStr = hookContext.getQueryPlan().getQueryStr(); - event.queryStartTime = hookContext.getQueryPlan().getQueryStartTime(); + final HiveConf conf = new HiveConf(hookContext.getConf()); - event.jsonPlan = getQueryPlan(hookContext.getConf(), hookContext.getQueryPlan()); + final HiveEventContext event = new HiveEventContext(); + event.setInputs(hookContext.getInputs()); + event.setOutputs(hookContext.getOutputs()); + event.setJsonPlan(getQueryPlan(hookContext.getConf(), hookContext.getQueryPlan())); + event.setHookType(hookContext.getHookType()); + event.setUgi(hookContext.getUgi()); + event.setUser(hookContext.getUserName()); + event.setOperation(OPERATION_MAP.get(hookContext.getOperationName())); + event.setQueryId(hookContext.getQueryPlan().getQueryId()); + event.setQueryStr(hookContext.getQueryPlan().getQueryStr()); + event.setQueryStartTime(hookContext.getQueryPlan().getQueryStartTime()); + event.setQueryType(hookContext.getQueryPlan().getQueryPlan().getQueryType()); boolean sync = conf.get(CONF_SYNC, "false").equals("true"); if (sync) { @@ -189,20 +284,21 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } } - private void fireAndForget(HiveEvent event) throws Exception { - assert event.hookType == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!"; + private void fireAndForget(HiveEventContext event) throws Exception { + assert event.getHookType() == HookContext.HookType.POST_EXEC_HOOK : "Non-POST_EXEC_HOOK not supported!"; - LOG.info("Entered Atlas hook for hook type {} operation {}", event.hookType, event.operation); + LOG.info("Entered Atlas hook for hook type {} operation {}", event.getHookType(), event.getOperation()); - HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf, atlasProperties, event.user, event.ugi); + HiveMetaStoreBridge dgiBridge = new HiveMetaStoreBridge(hiveConf, atlasProperties, event.getUser(), event.getUgi()); - switch (event.operation) { + switch (event.getOperation()) { case CREATEDATABASE: handleEventOutputs(dgiBridge, event, Type.DATABASE); break; case CREATETABLE: - handleEventOutputs(dgiBridge, event, Type.TABLE); + List<Pair<? extends Entity, Referenceable>> tablesCreated = handleEventOutputs(dgiBridge, event, Type.TABLE); + handleExternalTables(dgiBridge, event, tablesCreated.get(0).getLeft(), tablesCreated.get(0).getRight()); break; case CREATETABLE_AS_SELECT: @@ -221,25 +317,26 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { break; case ALTERTABLE_FILEFORMAT: - case ALTERTABLE_LOCATION: case ALTERTABLE_CLUSTER_SORT: case ALTERTABLE_BUCKETNUM: case ALTERTABLE_PROPERTIES: case ALTERVIEW_PROPERTIES: case ALTERTABLE_SERDEPROPERTIES: case ALTERTABLE_SERIALIZER: - alterTable(dgiBridge, event); - break; - case ALTERTABLE_ADDCOLS: case ALTERTABLE_REPLACECOLS: case ALTERTABLE_RENAMECOL: - alterTable(dgiBridge, event); + handleEventOutputs(dgiBridge, event, Type.TABLE); break; - + case ALTERTABLE_LOCATION: + List<Pair<? extends Entity, Referenceable>> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE); + if (tablesUpdated != null && tablesUpdated.size() > 0) { + //Track altered lineage in case of external tables + handleExternalTables(dgiBridge, event, tablesUpdated.get(0).getLeft(), tablesUpdated.get(0).getRight()); + } case ALTERDATABASE: case ALTERDATABASE_OWNER: - alterDatabase(dgiBridge, event); + handleEventOutputs(dgiBridge, event, Type.DATABASE); break; default: @@ -248,61 +345,37 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { notifyEntities(messages); } - private void alterDatabase(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { - assert event.outputs != null && event.outputs.size() > 0; - - for (WriteEntity writeEntity : event.outputs) { - if (writeEntity.getType() == Type.DATABASE) { - //Create/update table entity - createOrUpdateEntities(dgiBridge, event.user, writeEntity); - } - } - } - - private void alterTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { - assert event.inputs != null && event.inputs.size() == 1; - assert event.outputs != null && event.outputs.size() > 0; - - for (WriteEntity writeEntity : event.outputs) { - //Below check should filter out partition related ddls - if (writeEntity.getType() == Entity.Type.TABLE) { - //Create/update table entity - createOrUpdateEntities(dgiBridge, event.user, writeEntity); - } - } - } - - private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { + private void renameTable(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception { //crappy, no easy of getting new name - assert event.inputs != null && event.inputs.size() == 1; - assert event.outputs != null && event.outputs.size() > 0; + assert event.getInputs() != null && event.getInputs().size() == 1; + assert event.getOutputs() != null && event.getOutputs().size() > 0; //Update entity if not exists - ReadEntity oldEntity = event.inputs.iterator().next(); + ReadEntity oldEntity = event.getInputs().iterator().next(); Table oldTable = oldEntity.getTable(); - for (WriteEntity writeEntity : event.outputs) { + for (WriteEntity writeEntity : event.getOutputs()) { if (writeEntity.getType() == Entity.Type.TABLE) { Table newTable = writeEntity.getTable(); if (newTable.getDbName().equals(oldTable.getDbName()) && !newTable.getTableName() .equals(oldTable.getTableName())) { //Create/update old table entity - create new entity and replace id - Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.user, writeEntity); + Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity); String oldQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), - oldTable.getDbName(), oldTable.getTableName()); + oldTable.getDbName(), oldTable.getTableName()); tableEntity.set(HiveDataModelGenerator.NAME, oldQualifiedName); tableEntity.set(HiveDataModelGenerator.TABLE_NAME, oldTable.getTableName().toLowerCase()); String newQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), - newTable.getDbName(), newTable.getTableName()); + newTable.getDbName(), newTable.getTableName()); Referenceable newEntity = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); newEntity.set(HiveDataModelGenerator.NAME, newQualifiedName); newEntity.set(HiveDataModelGenerator.TABLE_NAME, newTable.getTableName().toLowerCase()); - messages.add(new HookNotification.EntityPartialUpdateRequest(event.user, - HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME, - oldQualifiedName, newEntity)); + messages.add(new HookNotification.EntityPartialUpdateRequest(event.getUser(), + HiveDataTypes.HIVE_TABLE.getName(), HiveDataModelGenerator.NAME, + oldQualifiedName, newEntity)); } } } @@ -346,12 +419,17 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return tableEntity; } - private void handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEvent event, Type entityType) throws Exception { - for (WriteEntity entity : event.outputs) { + private List<Pair<? extends Entity, Referenceable>> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception { + List<Pair<? extends Entity, Referenceable>> entitiesCreatedOrUpdated = new ArrayList<>(); + for (Entity entity : event.getOutputs()) { if (entity.getType() == entityType) { - createOrUpdateEntities(dgiBridge, event.user, entity); + Referenceable entityCreatedOrUpdated = createOrUpdateEntities(dgiBridge, event.getUser(), entity); + if (entitiesCreatedOrUpdated != null) { + entitiesCreatedOrUpdated.add(Pair.of(entity, entityCreatedOrUpdated)); + } } } + return entitiesCreatedOrUpdated; } private String normalize(String str) { @@ -361,9 +439,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return str.toLowerCase().trim(); } - private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEvent event) throws Exception { - Set<ReadEntity> inputs = event.inputs; - Set<WriteEntity> outputs = event.outputs; + private void registerProcess(HiveMetaStoreBridge dgiBridge, HiveEventContext event) throws Exception { + Set<ReadEntity> inputs = event.getInputs(); + Set<WriteEntity> outputs = event.getOutputs(); //Even explain CTAS has operation name as CREATETABLE_AS_SELECT if (inputs.isEmpty() && outputs.isEmpty()) { @@ -371,64 +449,54 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return; } - if (event.queryId == null) { - LOG.info("Query id/plan is missing for {}" , event.queryStr); + if (event.getQueryId() == null) { + LOG.info("Query id/plan is missing for {}", event.getQueryStr()); } - String queryStr = normalize(event.queryStr); - LOG.debug("Registering query: {}", queryStr); - - Map<String, Referenceable> source = new LinkedHashMap<>(); - Map<String, Referenceable> target = new LinkedHashMap<>(); + final Map<String, Referenceable> source = new LinkedHashMap<>(); + final Map<String, Referenceable> target = new LinkedHashMap<>(); boolean isSelectQuery = isSelectQuery(event); // Also filter out select queries which do not modify data if (!isSelectQuery) { - for (ReadEntity readEntity : inputs) { - if (readEntity.getType() == Type.TABLE || readEntity.getType() == Type.PARTITION) { - final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(),readEntity.getTable().getDbName(), readEntity.getTable().getTableName()); - if (!source.containsKey(tblQFName)) { - Referenceable inTable = createOrUpdateEntities(dgiBridge, event.user, readEntity); - source.put(tblQFName, inTable); - } - } + for (ReadEntity readEntity : event.getInputs()) { + processHiveEntity(dgiBridge, event, readEntity, source); } - for (WriteEntity writeEntity : outputs) { - if (writeEntity.getType() == Type.TABLE || writeEntity.getType() == Type.PARTITION) { - Referenceable outTable = createOrUpdateEntities(dgiBridge, event.user, writeEntity); - final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), writeEntity.getTable().getDbName(), writeEntity.getTable().getTableName()); - if (!target.containsKey(tblQFName)) { - target.put(tblQFName, outTable); - } - } + for (WriteEntity writeEntity : event.getOutputs()) { + processHiveEntity(dgiBridge, event, writeEntity, target); } if (source.size() > 0 || target.size() > 0) { - Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); - - List<Referenceable> sourceList = new ArrayList<>(source.values()); - List<Referenceable> targetList = new ArrayList<>(target.values()); - - //The serialization code expected a list - processReferenceable.set("inputs", sourceList); - processReferenceable.set("outputs", targetList); - processReferenceable.set("name", queryStr); - processReferenceable.set("operationType", event.operation.getOperationName()); - processReferenceable.set("startTime", event.queryStartTime); - processReferenceable.set("userName", event.user); - processReferenceable.set("queryText", queryStr); - processReferenceable.set("queryId", event.queryId); - processReferenceable.set("queryPlan", event.jsonPlan.toString()); - processReferenceable.set("endTime", System.currentTimeMillis()); - //TODO set queryGraph - messages.add(new HookNotification.EntityCreateRequest(event.user, processReferenceable)); + Referenceable processReferenceable = getProcessReferenceable(event, + new ArrayList<Referenceable>() {{ + addAll(source.values()); + }}, + new ArrayList<Referenceable>() {{ + addAll(target.values()); + }}); + messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable)); } else { - LOG.info("Skipped query {} since it has no inputs or resulting outputs", queryStr); + LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr()); } } else { - LOG.info("Skipped query {} for processing since it is a select query ", queryStr); + LOG.info("Skipped query {} for processing since it is a select query ", event.getQueryStr()); + } + } + + private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets) throws Exception { + if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) { + final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable().getDbName(), entity.getTable().getTableName()); + if (!dataSets.containsKey(tblQFName)) { + Referenceable inTable = createOrUpdateEntities(dgiBridge, event.getUser(), entity); + dataSets.put(tblQFName, inTable); + } + } else if (entity.getType() == Type.DFS_DIR) { + final String pathUri = normalize(new Path(entity.getLocation()).toString()); + LOG.info("Registering DFS Path {} ", pathUri); + Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri); + dataSets.put(pathUri, hdfsPath); } } @@ -444,13 +512,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } } - private boolean isSelectQuery(HiveEvent event) { - if (event.operation == HiveOperation.QUERY) { - Set<WriteEntity> outputs = event.outputs; + private boolean isSelectQuery(HiveEventContext event) { + if (event.getOperation() == HiveOperation.QUERY) { + Set<WriteEntity> outputs = event.getOutputs(); //Select query has only one output - if (outputs.size() == 1) { - WriteEntity output = outputs.iterator().next(); + if (event.getOutputs().size() == 1) { + WriteEntity output = event.getOutputs().iterator().next(); /* Strangely select queries have DFS_DIR as the type which seems like a bug in hive. Filter out by checking if the path is a temporary URI * Insert into/overwrite queries onto local or dfs paths have DFS_DIR or LOCAL_DIR as the type and WriteType.PATH_WRITE and tempUri = false * Insert into a temporary table has isTempURI = false. So will not skip as expected @@ -465,4 +533,50 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } return false; } + + private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final Entity entity, final Referenceable tblRef) throws HiveException, MalformedURLException { + Table hiveTable = entity.getTable(); + //Refresh to get the correct location + hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName()); + + final String location = normalize(hiveTable.getDataLocation().toString()); + if (hiveTable != null && TableType.EXTERNAL_TABLE.equals(hiveTable.getTableType())) { + LOG.info("Registering external table process {} ", event.getQueryStr()); + List<Referenceable> inputs = new ArrayList<Referenceable>() {{ + add(dgiBridge.fillHDFSDataSet(location)); + }}; + + List<Referenceable> outputs = new ArrayList<Referenceable>() {{ + add(tblRef); + }}; + + Referenceable processReferenceable = getProcessReferenceable(event, inputs, outputs); + messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable)); + } + } + + private Referenceable getProcessReferenceable(HiveEventContext hiveEvent, List<Referenceable> sourceList, List<Referenceable> targetList) { + Referenceable processReferenceable = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); + + String queryStr = normalize(hiveEvent.getQueryStr()); + LOG.debug("Registering query: {}", queryStr); + + //The serialization code expected a list + if (sourceList != null || !sourceList.isEmpty()) { + processReferenceable.set("inputs", sourceList); + } + if (targetList != null || !targetList.isEmpty()) { + processReferenceable.set("outputs", targetList); + } + processReferenceable.set("name", queryStr); + processReferenceable.set("operationType", hiveEvent.getOperation().getOperationName()); + processReferenceable.set("startTime", hiveEvent.getQueryStartTime()); + processReferenceable.set("userName", hiveEvent.getUser()); + processReferenceable.set("queryText", queryStr); + processReferenceable.set("queryId", hiveEvent.getQueryId()); + processReferenceable.set("queryPlan", hiveEvent.getJsonPlan()); + processReferenceable.set("endTime", System.currentTimeMillis()); + //TODO set queryGraph + return processReferenceable; + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index e17afb8..8ef8479 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -22,16 +22,19 @@ import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; +import org.apache.atlas.fs.model.FSDataTypes; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.utils.ParamChecker; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.ql.Driver; @@ -61,6 +64,9 @@ public class HiveHookIT { private Driver driver; private AtlasClient dgiCLient; private SessionState ss; + + private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS; + private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS; private enum QUERY_TYPE { GREMLIN, @@ -81,9 +87,11 @@ public class HiveHookIT { SessionState.setCurrentSessionState(ss); Configuration configuration = ApplicationProperties.get(); + dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL)); + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, configuration); hiveMetaStoreBridge.registerHiveDataModel(); - dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL)); + } private void runCommand(String cmd) throws Exception { @@ -145,10 +153,15 @@ public class HiveHookIT { return tableName; } - private String createTable(boolean isPartitioned, boolean isTemporary) throws Exception { + private String createTable(boolean isExternal, boolean isPartitioned, boolean isTemporary) throws Exception { String tableName = tableName(); - runCommand("create " + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? - " partitioned by(dt string)" : "")); + + String location = ""; + if (isExternal) { + location = " location '" + createTestDFSPath("someTestPath") + "'"; + } + runCommand("create " + (isExternal ? " EXTERNAL " : "") + (isTemporary ? "TEMPORARY " : "") + "table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? + " partitioned by(dt string)" : "") + location); return tableName; } @@ -182,6 +195,37 @@ public class HiveHookIT { assertDatabaseIsRegistered(DEFAULT_DB); } + @Test + public void testCreateExternalTable() throws Exception { + String tableName = tableName(); + String dbName = createDatabase(); + String colName = columnName(); + + String pFile = createTestDFSPath("parentPath"); + final String query = String.format("create EXTERNAL table %s.%s( %s, %s) location '%s'", dbName , tableName , colName + " int", "name string", pFile); + runCommand(query); + String tableId = assertTableIsRegistered(dbName, tableName); + + Referenceable processReference = validateProcess(query, 1, 1); + validateHDFSPaths(processReference, pFile, INPUTS); + validateOutputTables(processReference, tableId); + } + + private void validateOutputTables(Referenceable processReference, String... expectedTableGuids) throws Exception { + validateTables(processReference, OUTPUTS, expectedTableGuids); + } + + private void validateInputTables(Referenceable processReference, String... expectedTableGuids) throws Exception { + validateTables(processReference, INPUTS, expectedTableGuids); + } + + private void validateTables(Referenceable processReference, String attrName, String... expectedTableGuids) throws Exception { + List<Id> tableRef = (List<Id>) processReference.get(attrName); + for(int i = 0; i < expectedTableGuids.length; i++) { + Assert.assertEquals(tableRef.get(i)._getId(), expectedTableGuids[i]); + } + } + private String assertColumnIsRegistered(String colName) throws Exception { LOG.debug("Searching for column {}", colName.toLowerCase()); String query = @@ -265,9 +309,16 @@ public class HiveHookIT { Assert.assertEquals(vertices.length(), 0); } + private String createTestDFSPath(String path) throws Exception { + return "pfile://" + mkdir(path); + } + + private String createTestDFSFile(String path) throws Exception { + return "pfile://" + file(path); + } @Test - public void testLoadData() throws Exception { + public void testLoadLocalPath() throws Exception { String tableName = createTable(false); String loadFile = file("load"); @@ -278,17 +329,69 @@ public class HiveHookIT { } @Test - public void testLoadDataIntoPartition() throws Exception { + public void testLoadLocalPathIntoPartition() throws Exception { String tableName = createTable(true); String loadFile = file("load"); String query = "load data local inpath 'file://" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')"; runCommand(query); + validateProcess(query, 0, 1); + } + + @Test + public void testLoadDFSPath() throws Exception { + String tableName = createTable(true, true, false); + + String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + + String loadFile = createTestDFSFile("loadDFSFile"); + String query = "load data inpath '" + loadFile + "' into table " + tableName + " partition(dt = '2015-01-01')"; + runCommand(query); + + Referenceable processReference = validateProcess(query, 1, 1); + + validateHDFSPaths(processReference, loadFile, INPUTS); + + validateOutputTables(processReference, tableId); + } + + private Referenceable validateProcess(String query, int numInputs, int numOutputs) throws Exception { + String processId = assertProcessIsRegistered(query); + Referenceable process = dgiCLient.getEntity(processId); + if (numInputs == 0) { + Assert.assertNull(process.get(INPUTS)); + } else { + Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), numInputs); + } + + if (numOutputs == 0) { + Assert.assertNull(process.get(OUTPUTS)); + } else { + Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), numOutputs); + } + + return process; + } + + private Referenceable validateProcess(String query, String[] inputs, String[] outputs) throws Exception { String processId = assertProcessIsRegistered(query); Referenceable process = dgiCLient.getEntity(processId); - Assert.assertNull(process.get("inputs")); - Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1); + if (inputs == null) { + Assert.assertNull(process.get(INPUTS)); + } else { + Assert.assertEquals(((List<Referenceable>) process.get(INPUTS)).size(), inputs.length); + validateInputTables(process, inputs); + } + + if (outputs == null) { + Assert.assertNull(process.get(OUTPUTS)); + } else { + Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), outputs.length); + validateOutputTables(process, outputs); + } + + return process; } @Test @@ -299,13 +402,11 @@ public class HiveHookIT { "insert into " + insertTableName + " select id, name from " + tableName; runCommand(query); - String processId = assertProcessIsRegistered(query); - Referenceable process = dgiCLient.getEntity(processId); - Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); - Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1); - assertTableIsRegistered(DEFAULT_DB, tableName); - assertTableIsRegistered(DEFAULT_DB, insertTableName); + String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName); + String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName); + + validateProcess(query, new String[] {inputTableId}, new String[] {opTableId}); } @Test @@ -316,10 +417,7 @@ public class HiveHookIT { "insert overwrite LOCAL DIRECTORY '" + randomLocalPath.getAbsolutePath() + "' select id, name from " + tableName; runCommand(query); - String processId = assertProcessIsRegistered(query); - Referenceable process = dgiCLient.getEntity(processId); - Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); - Assert.assertNull(process.get("outputs")); + validateProcess(query, 1, 0); assertTableIsRegistered(DEFAULT_DB, tableName); } @@ -327,34 +425,32 @@ public class HiveHookIT { @Test public void testInsertIntoDFSDir() throws Exception { String tableName = createTable(); - String pFile = "pfile://" + mkdir("somedfspath"); + String pFile = createTestDFSPath("somedfspath"); String query = "insert overwrite DIRECTORY '" + pFile + "' select id, name from " + tableName; runCommand(query); - String processId = assertProcessIsRegistered(query); - Referenceable process = dgiCLient.getEntity(processId); - Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); - Assert.assertNull(process.get("outputs")); + Referenceable processReference = validateProcess(query, 1, 1); + validateHDFSPaths(processReference, pFile, OUTPUTS); - assertTableIsRegistered(DEFAULT_DB, tableName); + String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + + validateInputTables(processReference, tableId); } @Test public void testInsertIntoTempTable() throws Exception { String tableName = createTable(); - String insertTableName = createTable(false, true); + String insertTableName = createTable(false, false, true); String query = "insert into " + insertTableName + " select id, name from " + tableName; runCommand(query); - String processId = assertProcessIsRegistered(query); - Referenceable process = dgiCLient.getEntity(processId); - Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); - Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1); + validateProcess(query, 1, 1); - assertTableIsRegistered(DEFAULT_DB, tableName); - assertTableIsRegistered(DEFAULT_DB, insertTableName); + String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName); + String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName); + validateProcess(query, new String[] {ipTableId}, new String[] {opTableId}); } @Test @@ -365,10 +461,11 @@ public class HiveHookIT { "insert into " + insertTableName + " partition(dt = '2015-01-01') select id, name from " + tableName + " where dt = '2015-01-01'"; runCommand(query); - String processId = assertProcessIsRegistered(query); - Referenceable process = dgiCLient.getEntity(processId); - Assert.assertEquals(((List<Referenceable>) process.get("inputs")).size(), 1); - Assert.assertEquals(((List<Referenceable>) process.get("outputs")).size(), 1); + validateProcess(query, 1, 1); + + String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName); + String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName); + validateProcess(query, new String[] {ipTableId}, new String[] {opTableId}); } private String random() { @@ -390,19 +487,62 @@ public class HiveHookIT { } @Test - public void testExportImport() throws Exception { + public void testExportImportUnPartitionedTable() throws Exception { String tableName = createTable(false); + String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + String filename = "pfile://" + mkdir("export"); String query = "export table " + tableName + " to \"" + filename + "\""; runCommand(query); - assertProcessIsRegistered(query); + Referenceable processReference = validateProcess(query, 1, 1); + validateHDFSPaths(processReference, filename, OUTPUTS); + + validateInputTables(processReference, tableId); + //Import tableName = createTable(false); + tableId = assertTableIsRegistered(DEFAULT_DB, tableName); query = "import table " + tableName + " from '" + filename + "'"; runCommand(query); - assertProcessIsRegistered(query); + processReference = validateProcess(query, 1, 1); + validateHDFSPaths(processReference, filename, INPUTS); + + validateOutputTables(processReference, tableId); + + } + + @Test + public void testExportImportPartitionedTable() throws Exception { + String tableName = createTable(true); + String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + + //Add a partition + String partFile = "pfile://" + mkdir("partition"); + String query = "alter table " + tableName + " add partition (dt='2015-01-01') location '" + partFile + "'"; + runCommand(query); + + String filename = "pfile://" + mkdir("export"); + query = "export table " + tableName + " to \"" + filename + "\""; + runCommand(query); + Referenceable processReference = validateProcess(query, 1, 1); + validateHDFSPaths(processReference, filename, OUTPUTS); + + validateInputTables(processReference, tableId); + + //Import + tableName = createTable(true); + tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + + query = "import table " + tableName + " from '" + filename + "'"; + runCommand(query); + processReference = validateProcess(query, 1, 1); + validateHDFSPaths(processReference, filename, INPUTS); + + validateOutputTables(processReference, tableId); + + } @Test @@ -561,8 +701,9 @@ public class HiveHookIT { @Test public void testAlterTableLocation() throws Exception { - String tableName = createTable(); - final String testPath = "file://" + System.getProperty("java.io.tmpdir", "/tmp") + File.pathSeparator + "testPath"; + //Its an external table, so the HDFS location should also be registered as an entity + String tableName = createTable(true, true, false); + final String testPath = createTestDFSPath("testBaseDir"); String query = "alter table " + tableName + " set location '" + testPath + "'"; runCommand(query); @@ -571,6 +712,38 @@ public class HiveHookIT { Referenceable tableRef = dgiCLient.getEntity(tableId); Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC); Assert.assertEquals(sdRef.get("location"), testPath); + + Referenceable processReference = validateProcess(query, 1, 1); + validateHDFSPaths(processReference, testPath, INPUTS); + + validateOutputTables(processReference, tableId); + + } + + private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception { + List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName); + + final String testPathNormed = normalize(new Path(testPath).toString()); + String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed); + Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId); + + Referenceable hdfsPathRef = dgiCLient.getEntity(hdfsPathId); + Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed); + Assert.assertEquals(hdfsPathRef.get("name"), testPathNormed); +// Assert.assertEquals(hdfsPathRef.get("name"), new Path(testPath).getName()); + Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed); + + return hdfsPathRef.getId()._getId(); + } + + + private String assertHDFSPathIsRegistered(String path) throws Exception { + final String typeName = FSDataTypes.HDFS_PATH().toString(); + final String parentTypeName = FSDataTypes.FS_PATH().toString(); + String gremlinQuery = + String.format("g.V.has('__typeName', '%s').has('%s.path', \"%s\").toList()", typeName, parentTypeName, + normalize(path)); + return assertEntityIsRegistered(gremlinQuery); } @Test http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java ---------------------------------------------------------------------- diff --git a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java index 5665856..70a72ef 100644 --- a/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java +++ b/addons/storm-bridge/src/main/java/org/apache/atlas/storm/hook/StormAtlasHook.java @@ -219,8 +219,10 @@ public class StormAtlasHook extends AtlasHook implements ISubmitterHook { dataSetReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, hdfsPathStr); dataSetReferenceable.set("path", hdfsPathStr); dataSetReferenceable.set("owner", stormConf.get("hdfs.kerberos.principal")); - final Path hdfsPath = new Path(hdfsPathStr); - dataSetReferenceable.set(AtlasClient.NAME, hdfsPath.getName()); + //Fix after ATLAS-542 +// final Path hdfsPath = new Path(hdfsPathStr); +// dataSetReferenceable.set(AtlasClient.NAME, hdfsPath.getName()); + dataSetReferenceable.set(AtlasClient.NAME, hdfsPathStr); break; case "HiveBolt": http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index c3b4ba9..938a0a3 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -91,6 +91,9 @@ public class AtlasClient { public static final String REFERENCEABLE_SUPER_TYPE = "Referenceable"; public static final String REFERENCEABLE_ATTRIBUTE_NAME = "qualifiedName"; + public static final String PROCESS_ATTRIBUTE_INPUTS = "inputs"; + public static final String PROCESS_ATTRIBUTE_OUTPUTS = "outputs"; + public static final String JSON_MEDIA_TYPE = MediaType.APPLICATION_JSON + "; charset=UTF-8"; public static final String UNKNOWN_STATUS = "Unknown status"; http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 377ea5c..524cd33 100644 --- a/release-log.txt +++ b/release-log.txt @@ -13,6 +13,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-527 Support lineage for load table, import, export (sumasai via shwethags) ATLAS-572 Handle secure instance of Zookeeper for leader election.(yhemanth via sumasai) ATLAS-605 Hook Notifications for DELETE entity needs to be supported (sumasai) ATLAS-607 Add Support for delete entity through a qualifiedName (sumasai via yhemanth) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/46365f8c/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java b/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java index 430bb6b..41c0155 100644 --- a/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java +++ b/repository/src/main/java/org/apache/atlas/services/ReservedTypesRegistrar.java @@ -31,6 +31,8 @@ import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.util.Arrays; +import java.util.Comparator; public class ReservedTypesRegistrar implements IBootstrapTypesRegistrar { @@ -48,7 +50,17 @@ public class ReservedTypesRegistrar implements IBootstrapTypesRegistrar { LOG.info("No types directory {} found - not registering any reserved types", typesDirName); return; } + File[] typeDefFiles = typesDir.listFiles(); + //TODO - Enforce a dependency order among models registered by definition and not by modifiedTime as below + // Workaround - Sort by modifiedTime to get the dependency of models in the right order - first hdfs, followed by hive and hive is needed by storm, falcon models. + // Sorting them by time will ensure the right order since the modules are in the correct order in pom. + Arrays.sort(typeDefFiles, new Comparator<File>() { + public int compare(File f1, File f2) { + return Long.valueOf(f1.lastModified()).compareTo(f2.lastModified()); + } + }); + for (File typeDefFile : typeDefFiles) { try { String typeDefJSON = new String(Files.readAllBytes(typeDefFile.toPath()), StandardCharsets.UTF_8);
