Repository: incubator-atlas Updated Branches: refs/heads/master e0536224a -> 8fefd1655
ATLAS-884 Process registration should call Entity update instead of create (sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/8fefd165 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/8fefd165 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/8fefd165 Branch: refs/heads/master Commit: 8fefd165586f5fd8acd5c3a5786d6012dda68a5b Parents: e053622 Author: Suma Shivaprasad <[email protected]> Authored: Tue Jun 14 16:33:05 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Tue Jun 14 16:33:05 2016 -0700 ---------------------------------------------------------------------- .../org/apache/atlas/hive/hook/HiveHook.java | 71 ++++++---- .../org/apache/atlas/hive/hook/HiveHookIT.java | 135 ++++++++++++++----- release-log.txt | 1 + 3 files changed, 148 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fefd165/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index d7fa068..0ccb18b 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -62,6 +62,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -203,9 +204,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { break; case CREATETABLE: - List<Pair<? extends Entity, Referenceable>> tablesCreated = handleEventOutputs(dgiBridge, event, Type.TABLE); + LinkedHashMap<Type, Referenceable> tablesCreated = handleEventOutputs(dgiBridge, event, Type.TABLE); if (tablesCreated.size() > 0) { - handleExternalTables(dgiBridge, event, tablesCreated.get(0).getLeft(), tablesCreated.get(0).getRight()); + handleExternalTables(dgiBridge, event, tablesCreated); } break; @@ -242,10 +243,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { renameColumn(dgiBridge, event); break; case ALTERTABLE_LOCATION: - List<Pair<? extends Entity, Referenceable>> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE); + LinkedHashMap<Type, Referenceable> tablesUpdated = handleEventOutputs(dgiBridge, event, Type.TABLE); if (tablesUpdated != null && tablesUpdated.size() > 0) { //Track altered lineage in case of external tables - handleExternalTables(dgiBridge, event, tablesUpdated.get(0).getLeft(), tablesUpdated.get(0).getRight()); + handleExternalTables(dgiBridge, event, tablesUpdated); } break; case ALTERDATABASE: @@ -384,7 +385,8 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { //Create/update old table entity - create entity with oldQFNme and old tableName if it doesnt exist. If exists, will update //We always use the new entity while creating the table since some flags, attributes of the table are not set in inputEntity and Hive.getTable(oldTableName) also fails since the table doesnt exist in hive anymore - final Referenceable tableEntity = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity, true); + final LinkedHashMap<Type, Referenceable> tables = createOrUpdateEntities(dgiBridge, event.getUser(), writeEntity, true); + Referenceable tableEntity = tables.get(Type.TABLE); //Reset regular column QF Name to old Name and create a new partial notification request to replace old column QFName to newName to retain any existing traits replaceColumnQFName(event, (List<Referenceable>) tableEntity.get(HiveDataModelGenerator.COLUMNS), oldQualifiedName, newQualifiedName); @@ -458,10 +460,11 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return newSDEntity; } - private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables, Table existTable) throws Exception { + private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables, Table existTable) throws Exception { Database db = null; Table table = null; Partition partition = null; + LinkedHashMap<Type, Referenceable> result = new LinkedHashMap<>(); List<Referenceable> entities = new ArrayList<>(); switch (entity.getType()) { @@ -483,7 +486,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { db = dgiBridge.hiveClient.getDatabase(db.getName()); Referenceable dbEntity = dgiBridge.createDBInstance(db); + entities.add(dbEntity); + result.put(Type.DATABASE, dbEntity); Referenceable tableEntity = null; @@ -503,30 +508,38 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } else { tableEntity = dgiBridge.createTableInstance(dbEntity, table); entities.add(tableEntity); + result.put(Type.TABLE, tableEntity); } } + messages.add(new HookNotification.EntityUpdateRequest(user, entities)); - return tableEntity; + return result; } - private Referenceable createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables) throws Exception{ + private LinkedHashMap<Type, Referenceable> createOrUpdateEntities(HiveMetaStoreBridge dgiBridge, String user, Entity entity, boolean skipTempTables) throws Exception{ return createOrUpdateEntities(dgiBridge, user, entity, skipTempTables, null); } - private List<Pair<? extends Entity, Referenceable>> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception { - List<Pair<? extends Entity, Referenceable>> entitiesCreatedOrUpdated = new ArrayList<>(); + private LinkedHashMap<Type, Referenceable> handleEventOutputs(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Type entityType) throws Exception { for (Entity entity : event.getOutputs()) { if (entity.getType() == entityType) { - Referenceable entityCreatedOrUpdated = createOrUpdateEntities(dgiBridge, event.getUser(), entity, true); - if (entitiesCreatedOrUpdated != null) { - entitiesCreatedOrUpdated.add(Pair.of(entity, entityCreatedOrUpdated)); - } + return createOrUpdateEntities(dgiBridge, event.getUser(), entity, true); + } + } + return null; + } + + private Entity getEntityByType(Set<? extends Entity> entities, Type entityType) { + for (Entity entity : entities) { + if (entity.getType() == entityType) { + return entity; } } - return entitiesCreatedOrUpdated; + return null; } + public static String lower(String str) { if (StringUtils.isEmpty(str)) { return null; @@ -565,17 +578,18 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { final Map<String, Referenceable> source = new LinkedHashMap<>(); final Map<String, Referenceable> target = new LinkedHashMap<>(); + final Set<Referenceable> entities = new LinkedHashSet<>(); boolean isSelectQuery = isSelectQuery(event); // filter out select queries which do not modify data if (!isSelectQuery) { for (ReadEntity readEntity : event.getInputs()) { - processHiveEntity(dgiBridge, event, readEntity, source); + processHiveEntity(dgiBridge, event, readEntity, source, entities); } for (WriteEntity writeEntity : event.getOutputs()) { - processHiveEntity(dgiBridge, event, writeEntity, target); + processHiveEntity(dgiBridge, event, writeEntity, target, entities); } if (source.size() > 0 || target.size() > 0) { @@ -586,7 +600,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { new ArrayList<Referenceable>() {{ addAll(target.values()); }}); - messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable)); + + entities.add(processReferenceable); + messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), new ArrayList<Referenceable>(entities))); } else { LOG.info("Skipped query {} since it has no getInputs() or resulting getOutputs()", event.getQueryStr()); } @@ -595,18 +611,20 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { } } - private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets) throws Exception { + private void processHiveEntity(HiveMetaStoreBridge dgiBridge, HiveEventContext event, Entity entity, Map<String, Referenceable> dataSets, Set<Referenceable> entities) throws Exception { if (entity.getType() == Type.TABLE || entity.getType() == Type.PARTITION) { final String tblQFName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), entity.getTable()); if (!dataSets.containsKey(tblQFName)) { - Referenceable inTable = createOrUpdateEntities(dgiBridge, event.getUser(), entity, false); - dataSets.put(tblQFName, inTable); + LinkedHashMap<Type, Referenceable> result = createOrUpdateEntities(dgiBridge, event.getUser(), entity, false); + dataSets.put(tblQFName, result.get(Type.TABLE)); + entities.addAll(result.values()); } } else if (entity.getType() == Type.DFS_DIR) { final String pathUri = lower(new Path(entity.getLocation()).toString()); LOG.info("Registering DFS Path {} ", pathUri); Referenceable hdfsPath = dgiBridge.fillHDFSDataSet(pathUri); dataSets.put(pathUri, hdfsPath); + entities.add(hdfsPath); } } @@ -642,8 +660,9 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return false; } - private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final Entity entity, final Referenceable tblRef) throws HiveException, MalformedURLException { - Table hiveTable = entity.getTable(); + private void handleExternalTables(final HiveMetaStoreBridge dgiBridge, final HiveEventContext event, final LinkedHashMap<Type, Referenceable> tables) throws HiveException, MalformedURLException { + List<Referenceable> entities = new ArrayList<>(); + Table hiveTable = getEntityByType(event.getOutputs(), Type.TABLE).getTable(); //Refresh to get the correct location hiveTable = dgiBridge.hiveClient.getTable(hiveTable.getDbName(), hiveTable.getTableName()); @@ -655,11 +674,13 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { }}; List<Referenceable> outputs = new ArrayList<Referenceable>() {{ - add(tblRef); + add(tables.get(Type.TABLE)); }}; Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs); - messages.add(new HookNotification.EntityCreateRequest(event.getUser(), processReferenceable)); + entities.addAll(tables.values()); + entities.add(processReferenceable); + messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), entities)); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fefd165/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index c3f0bb6..00c17e8 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -248,7 +248,7 @@ public class HiveHookIT { verifyTimestamps(processReference, "startTime"); verifyTimestamps(processReference, "endTime"); - validateHDFSPaths(processReference, pFile, INPUTS); + validateHDFSPaths(processReference, INPUTS, pFile); } private void validateOutputTables(Referenceable processReference, String... expectedTableNames) throws Exception { @@ -301,6 +301,35 @@ public class HiveHookIT { } @Test + public void testDropAndRecreateCTASOutput() throws Exception { + String tableName = createTable(); + String ctasTableName = "table" + random(); + String query = "create table " + ctasTableName + " as select * from " + tableName; + runCommand(query); + + assertTableIsRegistered(DEFAULT_DB, ctasTableName); + String processId = assertProcessIsRegistered(query); + + final String drpquery = String.format("drop table %s ", ctasTableName); + runCommand(drpquery); + assertTableIsNotRegistered(DEFAULT_DB, ctasTableName); + + //Fix after ATLAS-876 + runCommand(query); + assertTableIsRegistered(DEFAULT_DB, ctasTableName); + String process2Id = assertProcessIsRegistered(query); + + Assert.assertEquals(process2Id, processId); + + Referenceable processRef = atlasClient.getEntity(processId); + String tblQlfdname = getQualifiedTblName(tableName); + String ctasQlfdname = getQualifiedTblName(ctasTableName); + + validateInputTables(processRef, tblQlfdname); + validateOutputTables(processRef, ctasQlfdname, ctasQlfdname); + } + + @Test public void testCreateView() throws Exception { String tableName = createTable(); String viewName = tableName(); @@ -402,7 +431,7 @@ public class HiveHookIT { final String tblQlfdName = getQualifiedTblName(tableName); Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName); - validateHDFSPaths(processReference, loadFile, INPUTS); + validateHDFSPaths(processReference, INPUTS, loadFile); validateOutputTables(processReference, tblQlfdName); } @@ -416,8 +445,8 @@ public class HiveHookIT { return inputtblQlfdName; } - private Referenceable validateProcess(String query, String inputTable, String outputTable) throws Exception { - String processId = assertProcessIsRegistered(query, inputTable, outputTable); + private Referenceable validateProcess(String query, String inputTable, String... outputTables) throws Exception { + String processId = assertProcessIsRegistered(query, inputTable, outputTables); Referenceable process = atlasClient.getEntity(processId); if (inputTable == null) { Assert.assertNull(process.get(INPUTS)); @@ -426,11 +455,11 @@ public class HiveHookIT { validateInputTables(process, inputTable); } - if (outputTable == null) { + if (outputTables == null) { Assert.assertNull(process.get(OUTPUTS)); } else { - Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1 ); - validateOutputTables(process, outputTable); + Assert.assertEquals(((List<Id>) process.get(OUTPUTS)).size(), 1); + validateOutputTables(process, outputTables); } return process; @@ -472,6 +501,43 @@ public class HiveHookIT { } @Test + public void testUpdateProcess() throws Exception { + String tableName = createTable(); + String pFile1 = createTestDFSPath("somedfspath1"); + String testPathNormed = lower(new Path(pFile1).toString()); + String query = + "insert overwrite DIRECTORY '" + pFile1 + "' select id, name from " + tableName; + + runCommand(query); + String tblQlfdname = getQualifiedTblName(tableName); + Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed); + validateHDFSPaths(processReference, OUTPUTS, pFile1); + + String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); + + validateInputTables(processReference, tblQlfdname); + + //Rerun same query with same HDFS path + + runCommand(query); + Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed); + validateHDFSPaths(process2Reference, OUTPUTS, pFile1); + + Assert.assertEquals(process2Reference.getId()._getId(), processReference.getId()._getId()); + + //Rerun same query with a new HDFS path. Should create a new process + String pFile2 = createTestDFSPath("somedfspath2"); + query = "insert overwrite DIRECTORY '" + pFile2 + "' select id, name from " + tableName; + final String testPathNormed2 = lower(new Path(pFile2).toString()); + runCommand(query); + + Referenceable process3Reference = validateProcess(query, tblQlfdname, testPathNormed2); + validateHDFSPaths(process3Reference, OUTPUTS, pFile2); + + Assert.assertNotEquals(process3Reference.getId()._getId(), processReference.getId()._getId()); + } + + @Test public void testInsertIntoDFSDir() throws Exception { String tableName = createTable(); String pFile1 = createTestDFSPath("somedfspath1"); @@ -482,7 +548,7 @@ public class HiveHookIT { runCommand(query); String tblQlfdname = getQualifiedTblName(tableName); Referenceable processReference = validateProcess(query, tblQlfdname, testPathNormed); - validateHDFSPaths(processReference, pFile1, OUTPUTS); + validateHDFSPaths(processReference, OUTPUTS, pFile1); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); @@ -498,7 +564,7 @@ public class HiveHookIT { runCommand(query); tblQlfdname = getQualifiedTblName(tableName); Referenceable process2Reference = validateProcess(query, tblQlfdname, testPathNormed); - validateHDFSPaths(process2Reference, pFile2, OUTPUTS); + validateHDFSPaths(process2Reference, OUTPUTS, pFile2); Assert.assertNotEquals(process2Reference.getId()._getId(), processReference.getId()._getId()); } @@ -564,7 +630,7 @@ public class HiveHookIT { runCommand(query); String tblQlfName = getQualifiedTblName(tableName); Referenceable processReference = validateProcess(query, tblQlfName, testPathNormed); - validateHDFSPaths(processReference, filename, OUTPUTS); + validateHDFSPaths(processReference, OUTPUTS, filename); validateInputTables(processReference, tblQlfName); //Import @@ -575,7 +641,7 @@ public class HiveHookIT { runCommand(query); tblQlfName = getQualifiedTblName(tableName); processReference = validateProcess(query, testPathNormed, tblQlfName); - validateHDFSPaths(processReference, filename, INPUTS); + validateHDFSPaths(processReference, INPUTS, filename); validateOutputTables(processReference, tblQlfName); } @@ -596,7 +662,7 @@ public class HiveHookIT { runCommand(query); String tblQlfdName = getQualifiedTblName(tableName); Referenceable processReference = validateProcess(query, tblQlfdName, testPathNormed); - validateHDFSPaths(processReference, filename, OUTPUTS); + validateHDFSPaths(processReference, OUTPUTS, filename); validateInputTables(processReference, tblQlfdName); @@ -608,7 +674,7 @@ public class HiveHookIT { runCommand(query); tblQlfdName = getQualifiedTblName(tableName); processReference = validateProcess(query, testPathNormed, tblQlfdName); - validateHDFSPaths(processReference, filename, INPUTS); + validateHDFSPaths(processReference, INPUTS, filename); validateOutputTables(processReference, tblQlfdName); } @@ -997,22 +1063,22 @@ public class HiveHookIT { final String testPathNormed = lower(new Path(testPath).toString()); Referenceable processReference = validateProcess(query, testPathNormed, tblQlfdName); - validateHDFSPaths(processReference, testPath, INPUTS); + validateHDFSPaths(processReference, INPUTS, testPath); } - private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception { + private void validateHDFSPaths(Referenceable processReference, String attributeName, String... testPaths) throws Exception { List<Id> hdfsPathRefs = (List<Id>) processReference.get(attributeName); - final String testPathNormed = lower(new Path(testPath).toString()); - String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed); - Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId); - - Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId); - Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed); - Assert.assertEquals(hdfsPathRef.get(NAME), new Path(testPathNormed).getName()); - Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed); + for (int i = 0; i < testPaths.length; i++) { + final String testPathNormed = lower(new Path(testPaths[i]).toString()); + String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed); + Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId); - return hdfsPathRef.getId()._getId(); + Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId); + Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed); + Assert.assertEquals(hdfsPathRef.get(NAME), new Path(testPathNormed).getName()); + Assert.assertEquals(hdfsPathRef.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME), testPathNormed); + } } private String assertHDFSPathIsRegistered(String path) throws Exception { @@ -1393,13 +1459,12 @@ public class HiveHookIT { } } - private String assertProcessIsRegistered(final String queryStr, final String inputTblName, final String outputTblName) throws Exception { + private String assertProcessIsRegistered(final String queryStr, final String inputTblName, final String... outputTblNames) throws Exception { HiveASTRewriter astRewriter = new HiveASTRewriter(conf); String normalizedQuery = normalize(astRewriter.rewrite(queryStr)); List<Referenceable> inputs = null; - if (inputTblName != null) { Referenceable inputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{ put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, inputTblName); @@ -1407,14 +1472,16 @@ public class HiveHookIT { inputs = new ArrayList<Referenceable>(); inputs.add(inputTableRef); } - List<Referenceable> outputs = null; - if (outputTblName != null) { - Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{ - put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, outputTblName); - }}); - - outputs = new ArrayList<Referenceable>(); - outputs.add(outputTableRef); + List<Referenceable> outputs = new ArrayList<Referenceable>(); + if (outputTblNames != null) { + for(int i = 0; i < outputTblNames.length; i++) { + final String outputTblName = outputTblNames[i]; + Referenceable outputTableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.name(), new HashMap<String, Object>() {{ + put(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, outputTblName); + }}); + + outputs.add(outputTableRef); + } } String processQFName = HiveHook.getProcessQualifiedName(normalizedQuery, inputs, outputs); LOG.debug("Searching for process with query {}", processQFName); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/8fefd165/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 76c5335..f168af6 100644 --- a/release-log.txt +++ b/release-log.txt @@ -23,6 +23,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-884 Process registration should call Entity update instead of create (sumasai) ATLAS-515 Ability to initialize Kafka topics with more than 1 replica (yhemanth) ATLAS-891 UI changes to implement Update term (Kalyanikashikar via yhemanth) ATLAS-794 Business Catalog Update (jspeidel via yhemanth)
