Repository: incubator-atlas Updated Branches: refs/heads/master 377bc6351 -> 76b056038
ATLAS-948 import-hive should allow an option to continue after failure(sumasai) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/76b05603 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/76b05603 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/76b05603 Branch: refs/heads/master Commit: 76b0560384f53ecb68b1b7e3e702bfde7e88e59b Parents: 377bc63 Author: Suma Shivaprasad <[email protected]> Authored: Tue Jun 28 10:34:33 2016 -0700 Committer: Suma Shivaprasad <[email protected]> Committed: Tue Jun 28 10:34:33 2016 -0700 ---------------------------------------------------------------------- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 114 ++++++++++++------- .../hive/bridge/HiveMetaStoreBridgeTest.java | 82 +++++++++++-- release-log.txt | 1 + 3 files changed, 143 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/76b05603/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index 4d009e8..0045780 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -33,6 +33,10 @@ import org.apache.atlas.typesystem.json.InstanceSerialization; import org.apache.atlas.typesystem.json.TypesSerialization; import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.utils.AuthenticationUtil; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.Options; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.fs.Path; @@ -111,17 +115,16 @@ public class HiveMetaStoreBridge { return atlasClient; } - void importHiveMetadata() throws Exception { + void importHiveMetadata(boolean failOnError) throws Exception { LOG.info("Importing hive metadata"); - importDatabases(); + importDatabases(failOnError); } - private void importDatabases() throws Exception { + private void importDatabases(boolean failOnError) throws Exception { List<String> databases = hiveClient.getAllDatabases(); for (String databaseName : databases) { Referenceable dbReference = registerDatabase(databaseName); - - importTables(dbReference, databaseName); + importTables(dbReference, databaseName, failOnError); } } @@ -254,52 +257,68 @@ public class HiveMetaStoreBridge { /** * Imports all tables for the given db - * @param databaseName * @param databaseReferenceable + * @param databaseName + * @param failOnError * @throws Exception */ - private void importTables(Referenceable databaseReferenceable, String databaseName) throws Exception { + private int importTables(Referenceable databaseReferenceable, String databaseName, final boolean failOnError) throws Exception { + int tablesImported = 0; List<String> hiveTables = hiveClient.getAllTables(databaseName); LOG.info("Importing tables {} for db {}", hiveTables.toString(), databaseName); for (String tableName : hiveTables) { - Table table = hiveClient.getTable(databaseName, tableName); - Referenceable tableReferenceable = registerTable(databaseReferenceable, table); - if (table.getTableType() == TableType.EXTERNAL_TABLE){ - String tableQualifiedName = getTableQualifiedName(clusterName, table); - Referenceable process = getProcessReference(tableQualifiedName); - if (process == null){ - LOG.info("Attempting to register create table process for {}", tableQualifiedName); - Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); - ArrayList<Referenceable> sourceList = new ArrayList<>(); - ArrayList<Referenceable> targetList = new ArrayList<>(); - String tableLocation = table.getDataLocation().toString(); - Referenceable path = fillHDFSDataSet(tableLocation); - String query = getCreateTableString(table, tableLocation); - sourceList.add(path); - targetList.add(tableReferenceable); - lineageProcess.set("inputs", sourceList); - lineageProcess.set("outputs", targetList); - lineageProcess.set("userName", table.getOwner()); - lineageProcess.set("startTime", new Date(System.currentTimeMillis())); - lineageProcess.set("endTime", new Date(System.currentTimeMillis())); - lineageProcess.set("operationType", "CREATETABLE"); - lineageProcess.set("queryText", query); - lineageProcess.set("queryId", query); - lineageProcess.set("queryPlan", "{}"); - lineageProcess.set("clusterName", clusterName); - List<String> recentQueries = new ArrayList<>(1); - recentQueries.add(query); - lineageProcess.set("recentQueries", recentQueries); - lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); - lineageProcess.set(AtlasClient.NAME, query); - registerInstance(lineageProcess); - + try { + Table table = hiveClient.getTable(databaseName, tableName); + Referenceable tableReferenceable = registerTable(databaseReferenceable, table); + tablesImported++; + if (table.getTableType() == TableType.EXTERNAL_TABLE) { + String tableQualifiedName = getTableQualifiedName(clusterName, table); + Referenceable process = getProcessReference(tableQualifiedName); + if (process == null) { + LOG.info("Attempting to register create table process for {}", tableQualifiedName); + Referenceable lineageProcess = new Referenceable(HiveDataTypes.HIVE_PROCESS.getName()); + ArrayList<Referenceable> sourceList = new ArrayList<>(); + ArrayList<Referenceable> targetList = new ArrayList<>(); + String tableLocation = table.getDataLocation().toString(); + Referenceable path = fillHDFSDataSet(tableLocation); + String query = getCreateTableString(table, tableLocation); + sourceList.add(path); + targetList.add(tableReferenceable); + lineageProcess.set("inputs", sourceList); + lineageProcess.set("outputs", targetList); + lineageProcess.set("userName", table.getOwner()); + lineageProcess.set("startTime", new Date(System.currentTimeMillis())); + lineageProcess.set("endTime", new Date(System.currentTimeMillis())); + lineageProcess.set("operationType", "CREATETABLE"); + lineageProcess.set("queryText", query); + lineageProcess.set("queryId", query); + lineageProcess.set("queryPlan", "{}"); + lineageProcess.set("clusterName", clusterName); + List<String> recentQueries = new ArrayList<>(1); + recentQueries.add(query); + lineageProcess.set("recentQueries", recentQueries); + lineageProcess.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, tableQualifiedName); + lineageProcess.set(AtlasClient.NAME, query); + registerInstance(lineageProcess); + } else { + LOG.info("Process {} is already registered", process.toString()); + } } - else { - LOG.info("Process {} is already registered", process.toString()); + } catch (Exception e) { + LOG.error("Import failed for hive_table {} ", tableName, e); + if (failOnError) { + throw e; } } } + + if ( tablesImported == hiveTables.size()) { + LOG.info("Successfully imported all {} tables from {} ", tablesImported, databaseName); + } else { + LOG.error("Unable to import {} tables out of {} tables from {}", tablesImported, hiveTables.size(), databaseName); + } + + return tablesImported; } /** @@ -618,7 +637,7 @@ public class HiveMetaStoreBridge { } } - public static void main(String[] argv) throws Exception { + public static void main(String[] args) throws Exception { Configuration atlasConf = ApplicationProperties.get(); String atlasEndpoint = atlasConf.getString(ATLAS_ENDPOINT, DEFAULT_DGI_URL); @@ -632,8 +651,17 @@ public class HiveMetaStoreBridge { atlasClient = new AtlasClient(ugi, ugi.getShortUserName(), atlasEndpoint); } + Options options = new Options(); + CommandLineParser parser = new BasicParser(); + CommandLine cmd = parser.parse( options, args); + + boolean failOnError = false; + if (cmd.hasOption("failOnError")) { + failOnError = true; + } + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(new HiveConf(), atlasClient); hiveMetaStoreBridge.registerHiveDataModel(); - hiveMetaStoreBridge.importHiveMetadata(); + hiveMetaStoreBridge.importHiveMetadata(failOnError); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/76b05603/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java index 856e5b1..9f7f6b0 100644 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridgeTest.java @@ -41,6 +41,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import scala.actors.threadpool.Arrays; +import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.List; import static org.mockito.Mockito.argThat; @@ -78,7 +80,7 @@ public class HiveMetaStoreBridgeTest { returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); - bridge.importHiveMetadata(); + bridge.importHiveMetadata(true); // verify update is called verify(atlasClient).updateEntity(eq("72e06b34-9151-4023-aa9d-b82103a50e76"), @@ -90,7 +92,7 @@ public class HiveMetaStoreBridgeTest { public void testImportThatUpdatesRegisteredTable() throws Exception { setupDB(hiveClient, TEST_DB_NAME); - Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); + List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); @@ -99,12 +101,12 @@ public class HiveMetaStoreBridgeTest { HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn( getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); - String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTable); + String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(0)); when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(), processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); - bridge.importHiveMetadata(); + bridge.importHiveMetadata(true); // verify update is called on table verify(atlasClient).updateEntity(eq("82e06b34-9151-4023-aa9d-b82103a50e77"), @@ -119,11 +121,15 @@ public class HiveMetaStoreBridgeTest { getEntityReference("72e06b34-9151-4023-aa9d-b82103a50e76")); } - private Table setupTable(Hive hiveClient, String databaseName, String tableName) throws HiveException { - when(hiveClient.getAllTables(databaseName)).thenReturn(Arrays.asList(new String[]{tableName})); - Table testTable = createTestTable(databaseName, tableName); - when(hiveClient.getTable(databaseName, tableName)).thenReturn(testTable); - return testTable; + private List<Table> setupTables(Hive hiveClient, String databaseName, String... tableNames) throws HiveException { + List<Table> tables = new ArrayList<>(); + when(hiveClient.getAllTables(databaseName)).thenReturn(Arrays.asList(tableNames)); + for(String tableName : tableNames) { + Table testTable = createTestTable(databaseName, tableName); + when(hiveClient.getTable(databaseName, tableName)).thenReturn(testTable); + tables.add(testTable); + } + return tables; } private void setupDB(Hive hiveClient, String databaseName) throws HiveException { @@ -135,7 +141,8 @@ public class HiveMetaStoreBridgeTest { @Test public void testImportWhenPartitionKeysAreNull() throws Exception { setupDB(hiveClient, TEST_DB_NAME); - Table hiveTable = setupTable(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); + List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME); + Table hiveTable = hiveTables.get(0); returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); @@ -157,12 +164,65 @@ public class HiveMetaStoreBridgeTest { HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); try { - bridge.importHiveMetadata(); + bridge.importHiveMetadata(true); } catch (Exception e) { Assert.fail("Partition with null key caused import to fail with exception ", e); } } + @Test + public void testImportContinuesWhenTableRegistrationFails() throws Exception { + setupDB(hiveClient, TEST_DB_NAME); + final String table2Name = TEST_TABLE_NAME + "_1"; + List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name); + + returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore")); + + when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, + table2Name, + HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn( + getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); + when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); + String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1)); + when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(), + processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); + + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + try { + bridge.importHiveMetadata(false); + } catch (Exception e) { + Assert.fail("Table registration failed with exception", e); + } + } + + @Test + public void testImportFailsWhenTableRegistrationFails() throws Exception { + setupDB(hiveClient, TEST_DB_NAME); + final String table2Name = TEST_TABLE_NAME + "_1"; + List<Table> hiveTables = setupTables(hiveClient, TEST_DB_NAME, TEST_TABLE_NAME, table2Name); + + returnExistingDatabase(TEST_DB_NAME, atlasClient, CLUSTER_NAME); + when(hiveClient.getTable(TEST_DB_NAME, TEST_TABLE_NAME)).thenThrow(new RuntimeException("Timeout while reading data from hive metastore")); + + when(atlasClient.searchByDSL(HiveMetaStoreBridge.getTableDSLQuery(CLUSTER_NAME, TEST_DB_NAME, + table2Name, + HiveDataTypes.HIVE_TABLE.getName(), false))).thenReturn( + getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); + when(atlasClient.getEntity("82e06b34-9151-4023-aa9d-b82103a50e77")).thenReturn(createTableReference()); + String processQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, hiveTables.get(1)); + when(atlasClient.searchByDSL(HiveMetaStoreBridge.getProcessDSLQuery(HiveDataTypes.HIVE_PROCESS.getName(), + processQualifiedName))).thenReturn(getEntityReference("82e06b34-9151-4023-aa9d-b82103a50e77")); + + HiveMetaStoreBridge bridge = new HiveMetaStoreBridge(CLUSTER_NAME, hiveClient, atlasClient); + try { + bridge.importHiveMetadata(true); + Assert.fail("Table registration is supposed to fail"); + } catch (Exception e) { + //Expected + } + } + private JSONArray getEntityReference(String id) throws JSONException { return new JSONArray(String.format("[{\"$id$\":{\"id\":\"%s\"}}]", id)); } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/76b05603/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 20fb7f1..e114e31 100644 --- a/release-log.txt +++ b/release-log.txt @@ -6,6 +6,7 @@ INCOMPATIBLE CHANGES: ALL CHANGES: +ATLAS-948 import-hive should allow an option to continue after failure (sumasai) ATLAS-954 Get hadoop classpath if command hadoop is in PATH (svimal2106 via sumasai) ATLAS-919 UI : Deleted references should be shown in red or filtered out (kevalbhatt18 via sumasai) ATLAS-927 aboutAtlas_tmpl.html has hard-coded project version (Kalyanikashikar via yhemanth)
