Repository: incubator-atlas Updated Branches: refs/heads/master 4f6816572 -> 9d1040b7c
ATLAS-642 import-hive should create the lineage for external tables ( svimal2106 via sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/9d1040b7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/9d1040b7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/9d1040b7 Branch: refs/heads/master Commit: 9d1040b7cf8e19728ad4c8d12ee24da28fb05ff8 Parents: 4f68165 Author: Suma Shivaprasad <[email protected]> Authored: Fri Jun 17 10:56:52 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Fri Jun 17 10:57:44 2016 -0700 ---------------------------------------------------------------------- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 66 +++++++++++++++++++- .../org/apache/atlas/hive/hook/HiveHook.java | 4 ++ .../hive/bridge/HiveMetaStoreBridgeTest.java | 8 ++- .../org/apache/atlas/hive/hook/HiveHookIT.java | 4 +- release-log.txt | 1 + 5 files changed, 78 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 9732bce..c1940a6 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -27,6 +27,7 @@ import org.apache.atlas.fs.model.FSDataModel; import org.apache.atlas.fs.model.FSDataTypes; import org.apache.atlas.hive.model.HiveDataModelGenerator; import org.apache.atlas.hive.model.HiveDataTypes; +import org.apache.atlas.notification.hook.HookNotification; import org.apache.atlas.typesystem.Referenceable; import org.apache.atlas.typesystem.Struct; import org.apache.atlas.typesystem.json.InstanceSerialization; @@ -37,6 +38,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; @@ -239,6 +241,18 @@ public class HiveMetaStoreBridge { return String.format("%s@%s", dbName.toLowerCase(), clusterName); } + private String getCreateTableString(Table table, String location){ + String colString = ""; + List<FieldSchema> colList = table.getAllCols(); + for(FieldSchema col:colList){ + colString += col.getName() + " " + col.getType() + ","; + } + colString = colString.substring(0, colString.length() - 1); + String query = "create external table " + table.getTableName() + "(" + colString + ")" + + " location '" + location + "'"; + return query; + } + /** * Imports all tables for the given db * @param databaseName @@ -247,10 +261,45 @@ public class HiveMetaStoreBridge { */ private void importTables(Referenceable databaseReferenceable, String databaseName) throws Exception { List<String> hiveTables = hiveClient.getAllTables(databaseName); - + LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName); for (String tableName : hiveTables) { Table table = hiveClient.getTable(databaseName, tableName); Referenceable tableReferenceable = registerTable(databaseReferenceable, table); + if (table.getTableType() == TableType.EXTERNAL_TABLE){ + String tableQualifiedName = getTableQualifiedName(clusterName, table); + Referenceable process = getProcessReference(tableQualifiedName); + if (process == null){ + LOG.info("Attempting to register create table process for {}", tableQualifiedName); + Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); + ArrayList<Referenceable> sourceList = new ArrayList<>(); + ArrayList<Referenceable> targetList = new ArrayList<>(); + String tableLocation = table.getDataLocation().toString(); + Referenceable path = fillHDFSDataSet(tableLocation); + String query = getCreateTableString(table, tableLocation); + sourceList.add(path); + targetList.add(tableReferenceable); + lineageProcess.set("inputs", sourceList); + lineageProcess.set("outputs", targetList); + lineageProcess.set("userName", table.getOwner()); + lineageProcess.set("startTime", new Date(System.currentTimeMillis())); + lineageProcess.set("endTime", new Date(System.currentTimeMillis())); + lineageProcess.set("operationType", "CREATETABLE"); + lineageProcess.set("queryText", query); + lineageProcess.set("queryId", query); + lineageProcess.set("queryPlan", "{}"); + lineageProcess.set("clusterName", clusterName); + List<String> recentQueries = new ArrayList<>(1); + recentQueries.add(query); + lineageProcess.set("recentQueries", recentQueries); + lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); + lineageProcess.set(AtlasClient.NAME, query); + registerInstance(lineageProcess); + + } + else { + LOG.info("Process {} is already registered", process.toString()); + } + } } } @@ -269,9 +318,21 @@ public class HiveMetaStoreBridge { return getEntityReferenceFromDSL(typeName, dslQuery); } + private Referenceable getProcessReference(String qualifiedName) throws Exception{ + LOG.debug("Getting reference for process {}", qualifiedName); + String typeName = HiveDataTypes.HIVE_PROCESS.getName(); + String dslQuery = getProcessDSLQuery(typeName, qualifiedName); + return getEntityReferenceFromDSL(typeName, dslQuery); + } + + static String getProcessDSLQuery(String typeName, String qualifiedName) throws Exception{ + String dslQuery = String.format("%s as t where qualifiedName = '%s'", typeName, qualifiedName); + return dslQuery; + } + static String getTableDSLQuery(String clusterName, String dbName, String tableName, String typeName, boolean isTemporary) { String entityName = getTableQualifiedName(clusterName, dbName, tableName, isTemporary); - return String.format("%s as t where name = '%s'", typeName, entityName); + return String.format("%s as t where qualifiedName = '%s'", typeName, entityName); } /** @@ -398,6 +459,7 @@ public class HiveMetaStoreBridge { String tableName = table.getTableName(); LOG.info("Attempting to register table [" + tableName + "]"); Referenceable tableReference = getTableReference(table); + LOG.info("Found result " + tableReference); if (tableReference == null) { tableReference = createTableInstance(dbReference, table); tableReference = registerInstance(tableReference); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index 0ccb18b..23c82df 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -678,6 +678,10 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { }}; Referenceable processReferenceable = getProcessReferenceable(dgiBridge, event, inputs, outputs); + String tableQualifiedName = dgiBridge.getTableQualifiedName(dgiBridge.getClusterName(), hiveTable); + if(isCreateOp(event)){ + processReferenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); + } entities.addAll(tables.values()); entities.add(processReferenceable); messages.add(new HookNotification.EntityUpdateRequest(event.getUser(), entities)); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java index dec5fcb..856e5b1 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java @@ -90,7 +90,7 @@ public class HiveMetaStoreBridgeTest { public void testImportThatUpdatesRegisteredTable() throws Exception { setupDB(hiveClient, TEST_DB_NAME); - setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); + Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); @@ -99,6 +99,9 @@ public class HiveMetaStoreBridgeTest { HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn( getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); + String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTable); + when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(), + processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); bridge.importHiveMetadata(); @@ -140,6 +143,9 @@ public class HiveMetaStoreBridgeTest { TEST_TABLE_NAME, HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn( getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); + String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTable); + when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(), + processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); Partition partition = mock(Partition.class); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index 00c17e8..f5904d6 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -240,8 +240,8 @@ public class HiveHookIT { final String query = String.format("create TEMPORARY EXTERNAL table %s.%s( %s, %s) location '%s'", DEFAULT_DB , tableName , colName + " int", "name string", pFile); runCommand(query); assertTableIsRegistered(DEFAULT_DB, tableName, null, true); - - String processId = assertProcessIsRegistered(query); + String processId = assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName, true), null); Referenceable processReference = atlasClient.getEntity(processId); assertEquals(processReference.get("userName"), UserGroupInformation.getCurrentUser().getShortUserName()); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9d1040b7/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 225cc8e..e4256bf 100644 --- a/release-log.txt +++ b/release-log.txt @@ -24,6 +24,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file (dosset ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via shwethags) ALL CHANGES: +ATLAS-642 import-hive should create the lineage for external tables (svimal2106 via sumasai) ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth) ATLAS-911 Get entity by unique attribute doesn't enforce type (shwethags) ATLAS-899 Fix Hive Hook documentation (sumasai via yhemanth)
