ATLAS-622 Introduce soft delete (shwethags)
Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/40ee9492 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/40ee9492 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/40ee9492 Branch: refs/heads/master Commit: 40ee9492192f8ea3afc4be7f6fa55803214e8247 Parents: daf812a Author: Shwetha GS <[email protected]> Authored: Sat Apr 16 09:14:01 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Sat Apr 16 09:15:15 2016 +0530 ---------------------------------------------------------------------- .../atlas/hive/bridge/HiveMetaStoreBridge.java | 2 +- .../org/apache/atlas/hive/hook/HiveHook.java | 2 +- .../org/apache/atlas/hive/hook/HiveHookIT.java | 515 ++++++----- .../main/java/org/apache/atlas/AtlasClient.java | 10 +- .../org/apache/atlas/ApplicationProperties.java | 10 + release-log.txt | 1 + .../apache/atlas/RepositoryMetadataModule.java | 11 + .../repository/audit/EntityAuditListener.java | 10 +- .../atlas/repository/graph/AtlasEdgeLabel.java | 3 - .../atlas/repository/graph/DeleteHandler.java | 375 ++++++++ .../graph/GraphBackedMetadataRepository.java | 81 +- .../graph/GraphBackedSearchIndexer.java | 10 +- .../atlas/repository/graph/GraphHelper.java | 204 ++-- .../graph/GraphToTypedInstanceMapper.java | 146 +-- .../repository/graph/HardDeleteHandler.java | 45 + .../repository/graph/SoftDeleteHandler.java | 55 ++ .../graph/TypedInstanceToGraphMapper.java | 919 ++++++------------- .../typestore/GraphBackedTypeStore.java | 5 +- .../apache/atlas/BaseHiveRepositoryTest.java | 1 + .../test/java/org/apache/atlas/TestUtils.java | 78 +- .../GraphBackedDiscoveryServiceTest.java | 14 +- ...kedMetadataRepositoryDeleteEntitiesTest.java | 613 ------------- ...hBackedMetadataRepositoryDeleteTestBase.java | 772 ++++++++++++++++ .../GraphBackedMetadataRepositoryTest.java | 96 +- .../GraphBackedRepositoryHardDeleteTest.java | 121 +++ .../GraphBackedRepositorySoftDeleteTest.java | 121 +++ .../graph/GraphRepoMapperScaleTest.java | 7 + .../typestore/GraphBackedTypeStoreTest.java | 8 +- .../service/DefaultMetadataServiceTest.java | 204 ++-- .../org/apache/atlas/query/GremlinTest.scala | 2 +- .../org/apache/atlas/query/GremlinTest2.scala | 2 +- .../apache/atlas/query/LineageQueryTest.scala | 2 +- .../apache/atlas/query/QueryTestsUtils.scala | 2 +- .../java/org/apache/atlas/RequestContext.java | 55 ++ .../org/apache/atlas/typesystem/IInstance.java | 1 + .../apache/atlas/typesystem/Referenceable.java | 5 + .../org/apache/atlas/typesystem/Struct.java | 5 + .../persistence/DownCastStructInstance.java | 5 + .../apache/atlas/typesystem/persistence/Id.java | 22 +- .../persistence/ReferenceableInstance.java | 13 + .../typesystem/persistence/StructInstance.java | 5 + .../apache/atlas/ApplicationPropertiesTest.java | 29 +- .../test/resources/atlas-application.properties | 4 +- .../NotificationHookConsumerIT.java | 10 +- .../web/resources/EntityJerseyResourceIT.java | 33 +- 45 files changed, 2666 insertions(+), 1968 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java index f007a32..6eb5194 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/bridge/HiveMetaStoreBridge.java @@ -459,7 +459,7 @@ public class HiveMetaStoreBridge { final String[] parts = tableQualifiedName.split("@"); final String tableName = parts[0]; final String clusterName = parts[1]; - return String.format("%s.%s@%s", tableName, colName, clusterName); + return String.format("%s.%s@%s", tableName, colName.toLowerCase(), clusterName); } public List<Referenceable> getColumns(List<FieldSchema> schemaList, String tableQualifiedName) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java index b9f00fd..4ee15d6 100755 --- a/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java +++ b/addons/hive-bridge/src/main/java/org/apache/atlas/hive/hook/HiveHook.java @@ -481,7 +481,7 @@ public class HiveHook extends AtlasHook implements ExecuteWithHookContext { return entitiesCreatedOrUpdated; } - private String normalize(String str) { + public static String normalize(String str) { if (StringUtils.isEmpty(str)) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java ---------------------------------------------------------------------- diff --git a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java index 683f43c..5fda0d3 100755 --- a/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java +++ b/addons/hive-bridge/src/test/java/org/apache/atlas/hive/hook/HiveHookIT.java @@ -20,8 +20,10 @@ package org.apache.atlas.hive.hook; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; +import com.sun.jersey.api.client.ClientResponse; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; +import org.apache.atlas.AtlasServiceException; import org.apache.atlas.fs.model.FSDataTypes; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataModelGenerator; @@ -32,7 +34,6 @@ import org.apache.atlas.typesystem.persistence.Id; import org.apache.atlas.utils.ParamChecker; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; -import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -41,7 +42,6 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.hooks.Entity; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; -import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.testng.Assert; @@ -53,26 +53,24 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.atlas.hive.hook.HiveHook.normalize; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; public class HiveHookIT { - public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HiveHookIT.class); + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(HiveHookIT.class); private static final String DGI_URL = "http://localhost:21000/"; private static final String CLUSTER_NAME = "test"; public static final String DEFAULT_DB = "default"; private Driver driver; - private AtlasClient dgiCLient; + private AtlasClient atlasClient; private SessionState ss; private static final String INPUTS = AtlasClient.PROCESS_ATTRIBUTE_INPUTS; private static final String OUTPUTS = AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS; - private enum QUERY_TYPE { - GREMLIN, - DSL - } - @BeforeClass public void setUp() throws Exception { //Set-up hive session @@ -87,9 +85,9 @@ public class HiveHookIT { SessionState.setCurrentSessionState(ss); Configuration configuration = ApplicationProperties.get(); - dgiCLient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL)); + atlasClient = new AtlasClient(configuration.getString(HiveMetaStoreBridge.ATLAS_ENDPOINT, DGI_URL)); - HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, dgiCLient); + HiveMetaStoreBridge hiveMetaStoreBridge = new HiveMetaStoreBridge(conf, atlasClient); hiveMetaStoreBridge.registerHiveDataModel(); } @@ -107,7 +105,7 @@ public class HiveHookIT { runCommand("create database " + dbName + " WITH DBPROPERTIES ('p1'='v1', 'p2'='v2')"); String dbId = assertDatabaseIsRegistered(dbName); - Referenceable definition = dgiCLient.getEntity(dbId); + Referenceable definition = atlasClient.getEntity(dbId); Map params = (Map) definition.get(HiveDataModelGenerator.PARAMETERS); Assert.assertNotNull(params); Assert.assertEquals(params.size(), 2); @@ -115,11 +113,13 @@ public class HiveHookIT { //There should be just one entity per dbname runCommand("drop database " + dbName); + assertDBIsNotRegistered(dbName); + runCommand("create database " + dbName); String dbid = assertDatabaseIsRegistered(dbName); //assert on qualified name - Referenceable dbEntity = dgiCLient.getEntity(dbid); + Referenceable dbEntity = atlasClient.getEntity(dbid); Assert.assertEquals(dbEntity.get("qualifiedName"), dbName.toLowerCase() + "@" + CLUSTER_NAME); } @@ -149,7 +149,7 @@ public class HiveHookIT { private String createTable(boolean isPartitioned) throws Exception { String tableName = tableName(); runCommand("create table " + tableName + "(id int, name string) comment 'table comment' " + (isPartitioned ? - " partitioned by(dt string)" : "")); + " partitioned by(dt string)" : "")); return tableName; } @@ -174,14 +174,15 @@ public class HiveHookIT { assertTableIsRegistered(dbName, tableName); //there is only one instance of column registered - String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName)); - Referenceable colEntity = dgiCLient.getEntity(colId); + String colId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName), colName)); + Referenceable colEntity = atlasClient.getEntity(colId); Assert.assertEquals(colEntity.get("qualifiedName"), String.format("%s.%s.%s@%s", dbName.toLowerCase(), tableName.toLowerCase(), colName.toLowerCase(), CLUSTER_NAME)); tableName = createTable(); String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - Referenceable tableRef = dgiCLient.getEntity(tableId); + Referenceable tableRef = atlasClient.getEntity(tableId); Assert.assertEquals(tableRef.get("tableType"), TableType.MANAGED_TABLE.name()); Assert.assertEquals(tableRef.get(HiveDataModelGenerator.COMMENT), "table comment"); String entityName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); @@ -227,17 +228,19 @@ public class HiveHookIT { } private String assertColumnIsRegistered(String colName) throws Exception { - LOG.debug("Searching for column {}", colName.toLowerCase()); - String query = - String.format("%s where qualifiedName = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase()); - return assertEntityIsRegistered(query); + return assertColumnIsRegistered(colName, null); + } + + private String assertColumnIsRegistered(String colName, AssertPredicate assertPredicate) throws Exception { + LOG.debug("Searching for column {}", colName); + return assertEntityIsRegistered(HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + colName, assertPredicate); } private void assertColumnIsNotRegistered(String colName) throws Exception { LOG.debug("Searching for column {}", colName); - String query = - String.format("%s where qualifiedName = '%s'", HiveDataTypes.HIVE_COLUMN.getName(), colName.toLowerCase()); - assertEntityIsNotRegistered(QUERY_TYPE.DSL, query); + assertEntityIsNotRegistered(HiveDataTypes.HIVE_COLUMN.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + colName); } @Test @@ -277,7 +280,7 @@ public class HiveHookIT { //Check lineage which includes table1 String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName); - JSONObject response = dgiCLient.getInputGraph(datasetName); + JSONObject response = atlasClient.getInputGraph(datasetName); JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); Assert.assertTrue(vertices.has(viewId)); Assert.assertTrue(vertices.has(table1Id)); @@ -293,7 +296,7 @@ public class HiveHookIT { Assert.assertEquals(assertTableIsRegistered(DEFAULT_DB, viewName), viewId); datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName); - response = dgiCLient.getInputGraph(datasetName); + response = atlasClient.getInputGraph(datasetName); vertices = response.getJSONObject("values").getJSONObject("vertices"); Assert.assertTrue(vertices.has(viewId)); @@ -304,7 +307,7 @@ public class HiveHookIT { Assert.assertTrue(vertices.has(table1Id)); //Outputs dont exist - response = dgiCLient.getOutputGraph(datasetName); + response = atlasClient.getOutputGraph(datasetName); vertices = response.getJSONObject("values").getJSONObject("vertices"); Assert.assertEquals(vertices.length(), 0); } @@ -358,7 +361,7 @@ public class HiveHookIT { private Referenceable validateProcess(String query, int numInputs, int numOutputs) throws Exception { String processId = assertProcessIsRegistered(query); - Referenceable process = dgiCLient.getEntity(processId); + Referenceable process = atlasClient.getEntity(processId); if (numInputs == 0) { Assert.assertNull(process.get(INPUTS)); } else { @@ -376,7 +379,7 @@ public class HiveHookIT { private Referenceable validateProcess(String query, String[] inputs, String[] outputs) throws Exception { String processId = assertProcessIsRegistered(query); - Referenceable process = dgiCLient.getEntity(processId); + Referenceable process = atlasClient.getEntity(processId); if (inputs == null) { Assert.assertNull(process.get(INPUTS)); } else { @@ -406,7 +409,7 @@ public class HiveHookIT { String inputTableId = assertTableIsRegistered(DEFAULT_DB, tableName); String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName); - validateProcess(query, new String[] {inputTableId}, new String[] {opTableId}); + validateProcess(query, new String[]{inputTableId}, new String[]{opTableId}); } @Test @@ -450,7 +453,7 @@ public class HiveHookIT { String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName); String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName); - validateProcess(query, new String[] {ipTableId}, new String[] {opTableId}); + validateProcess(query, new String[]{ipTableId}, new String[]{opTableId}); } @Test @@ -465,7 +468,7 @@ public class HiveHookIT { String ipTableId = assertTableIsRegistered(DEFAULT_DB, tableName); String opTableId = assertTableIsRegistered(DEFAULT_DB, insertTableName); - validateProcess(query, new String[] {ipTableId}, new String[] {opTableId}); + validateProcess(query, new String[]{ipTableId}, new String[]{opTableId}); } private String random() { @@ -571,7 +574,7 @@ public class HiveHookIT { private List<Referenceable> getColumns(String dbName, String tableName) throws Exception { String tableId = assertTableIsRegistered(dbName, tableName); - Referenceable tableRef = dgiCLient.getEntity(tableId); + Referenceable tableRef = atlasClient.getEntity(tableId); return ((List<Referenceable>)tableRef.get(HiveDataModelGenerator.COLUMNS)); } @@ -582,7 +585,9 @@ public class HiveHookIT { String query = "alter table " + tableName + " add columns (" + column + " string)"; runCommand(query); - assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), column)); + assertColumnIsRegistered(HiveMetaStoreBridge + .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), + column)); //Verify the number of columns present in the table final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName); @@ -595,13 +600,21 @@ public class HiveHookIT { final String colDropped = "id"; String query = "alter table " + tableName + " replace columns (name string)"; runCommand(query); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), colDropped)); + + assertColumnIsNotRegistered(HiveMetaStoreBridge + .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), + colDropped)); //Verify the number of columns present in the table - final List<Referenceable> columns = getColumns(DEFAULT_DB, tableName); - Assert.assertEquals(columns.size(), 1); + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable tableRef) throws Exception { + List<Referenceable> columns = (List<Referenceable>) tableRef.get(HiveDataModelGenerator.COLUMNS); + Assert.assertEquals(columns.size(), 1); + Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "name"); - Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "name"); + } + }); } @Test @@ -612,12 +625,15 @@ public class HiveHookIT { String tableName = createTable(); String query = String.format("alter table %s change %s %s string", tableName, oldColName, newColName); runCommand(query); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); - assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName)); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName)); //Verify the number of columns present in the table List<Referenceable> columns = getColumns(DEFAULT_DB, tableName); Assert.assertEquals(columns.size(), 2); + //Change column type oldColName = "name1"; newColName = "name2"; @@ -627,46 +643,70 @@ public class HiveHookIT { columns = getColumns(DEFAULT_DB, tableName); Assert.assertEquals(columns.size(), 2); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); - - String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); - assertColumnIsRegistered(newColQualifiedName); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); - Assert.assertEquals(columns.get(1).get("type"), "int"); + String newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); + assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable entity) throws Exception { + assertEquals(entity.get("type"), "int"); + } + }); //Change name and add comment oldColName = "name2"; newColName = "name3"; final String comment = "added comment"; - query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, newColName, newColType, comment); + query = String.format("alter table %s change column %s %s %s COMMENT '%s' after id", tableName, oldColName, + newColName, newColType, comment); runCommand(query); columns = getColumns(DEFAULT_DB, tableName); Assert.assertEquals(columns.size(), 2); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); - newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); - assertColumnIsRegistered(newColQualifiedName); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); - Assert.assertEquals(columns.get(1).get(HiveDataModelGenerator.COMMENT), comment); + assertColumnIsRegistered(newColQualifiedName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable entity) throws Exception { + assertEquals(entity.get(HiveDataModelGenerator.COMMENT), comment); + } + }); //Change column position oldColName = "name3"; newColName = "name4"; - query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName, newColType); + query = String.format("alter table %s change column %s %s %s first", tableName, oldColName, newColName, + newColType); runCommand(query); columns = getColumns(DEFAULT_DB, tableName); Assert.assertEquals(columns.size(), 2); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); - newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + + newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); assertColumnIsRegistered(newColQualifiedName); - //Change col position again - Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), newColName); - Assert.assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), "id"); + final String finalNewColName = newColName; + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable entity) throws Exception { + List<Referenceable> columns = (List<Referenceable>) entity.get(HiveDataModelGenerator.COLUMNS); + assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), finalNewColName); + assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), "id"); + } + } + ); + //Change col position again oldColName = "name4"; newColName = "name5"; query = String.format("alter table %s change column %s %s %s after id", tableName, oldColName, newColName, newColType); @@ -675,16 +715,27 @@ public class HiveHookIT { columns = getColumns(DEFAULT_DB, tableName); Assert.assertEquals(columns.size(), 2); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); - newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), oldColName)); + + newColQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), newColName); assertColumnIsRegistered(newColQualifiedName); //Check col position - Assert.assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), newColName); - Assert.assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "id"); + final String finalNewColName2 = newColName; + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable entity) throws Exception { + List<Referenceable> columns = (List<Referenceable>) entity.get(HiveDataModelGenerator.COLUMNS); + assertEquals(columns.get(1).get(HiveDataModelGenerator.NAME), finalNewColName2); + assertEquals(columns.get(0).get(HiveDataModelGenerator.NAME), "id"); + } + } + ); } - @Test() + @Test public void testTruncateTable() throws Exception { String tableName = createTable(false); String query = String.format("truncate table %s", tableName); @@ -695,7 +746,7 @@ public class HiveHookIT { //Check lineage String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName); - JSONObject response = dgiCLient.getInputGraph(datasetName); + JSONObject response = atlasClient.getInputGraph(datasetName); JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); //Below should be assertTrue - Fix https://issues.apache.org/jira/browse/ATLAS-653 Assert.assertFalse(vertices.has(tableId)); @@ -708,15 +759,24 @@ public class HiveHookIT { String query = String.format("ALTER TABLE %s PARTITION COLUMN (dt %s)", tableName, newType); runCommand(query); - final String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - final String dtColId = assertColumnIsRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt")); - Referenceable table = dgiCLient.getEntity(tableId); - Referenceable column = dgiCLient.getEntity(dtColId); - Assert.assertEquals(column.get("type"), newType); + String colQualifiedName = HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "dt"); + final String dtColId = assertColumnIsRegistered(colQualifiedName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable column) throws Exception { + Assert.assertEquals(column.get("type"), newType); + } + }); - final List<Referenceable> partitionKeys = (List<Referenceable>) table.get("partitionKeys"); - Assert.assertEquals(partitionKeys.size(), 1); - Assert.assertEquals(partitionKeys.get(0).getId()._getId(), dtColId); + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable table) throws Exception { + final List<Referenceable> partitionKeys = (List<Referenceable>) table.get("partitionKeys"); + Assert.assertEquals(partitionKeys.size(), 1); + Assert.assertEquals(partitionKeys.get(0).getId()._getId(), dtColId); + + } + }); } @Test @@ -742,17 +802,18 @@ public class HiveHookIT { String query = "alter table " + tableName + " set location '" + testPath + "'"; runCommand(query); - String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - //Verify the number of columns present in the table - Referenceable tableRef = dgiCLient.getEntity(tableId); - Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC); - Assert.assertEquals(sdRef.get("location"), testPath); + String tableId = assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable tableRef) throws Exception { + Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC); + Assert.assertEquals(sdRef.get("location"), testPath); + } + }); Referenceable processReference = validateProcess(query, 1, 1); validateHDFSPaths(processReference, testPath, INPUTS); validateOutputTables(processReference, tableId); - } private String validateHDFSPaths(Referenceable processReference, String testPath, String attributeName) throws Exception { @@ -762,7 +823,7 @@ public class HiveHookIT { String hdfsPathId = assertHDFSPathIsRegistered(testPathNormed); Assert.assertEquals(hdfsPathRefs.get(0)._getId(), hdfsPathId); - Referenceable hdfsPathRef = dgiCLient.getEntity(hdfsPathId); + Referenceable hdfsPathRef = atlasClient.getEntity(hdfsPathId); Assert.assertEquals(hdfsPathRef.get("path"), testPathNormed); Assert.assertEquals(hdfsPathRef.get("name"), testPathNormed); // Assert.assertEquals(hdfsPathRef.get("name"), new Path(testPath).getName()); @@ -771,14 +832,9 @@ public class HiveHookIT { return hdfsPathRef.getId()._getId(); } - private String assertHDFSPathIsRegistered(String path) throws Exception { - final String typeName = FSDataTypes.HDFS_PATH().toString(); - final String parentTypeName = FSDataTypes.FS_PATH().toString(); - String gremlinQuery = - String.format("g.V.has('__typeName', '%s').has('%s.path', \"%s\").toList()", typeName, parentTypeName, - normalize(path)); - return assertEntityIsRegistered(gremlinQuery); + LOG.debug("Searching for hdfs path {}", path); + return assertEntityIsRegistered(FSDataTypes.HDFS_PATH().toString(), "name", path, null); } @Test @@ -788,18 +844,25 @@ public class HiveHookIT { String query = "alter table " + tableName + " set FILEFORMAT " + testFormat; runCommand(query); - String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - - Referenceable tableRef = dgiCLient.getEntity(tableId); - Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC); - Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_INPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); - Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_OUTPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); - Assert.assertNotNull(sdRef.get("serdeInfo")); - - Struct serdeInfo = (Struct) sdRef.get("serdeInfo"); - Assert.assertEquals(serdeInfo.get("serializationLib"), "org.apache.hadoop.hive.ql.io.orc.OrcSerde"); - Assert.assertNotNull(serdeInfo.get(HiveDataModelGenerator.PARAMETERS)); - Assert.assertEquals(((Map<String, String>)serdeInfo.get(HiveDataModelGenerator.PARAMETERS)).get("serialization.format"), "1"); + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable tableRef) throws Exception { + Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC); + Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_INPUT_FMT), + "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); + Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_OUTPUT_FMT), + "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); + Assert.assertNotNull(sdRef.get("serdeInfo")); + + Struct serdeInfo = (Struct) sdRef.get("serdeInfo"); + Assert.assertEquals(serdeInfo.get("serializationLib"), "org.apache.hadoop.hive.ql.io.orc.OrcSerde"); + Assert.assertNotNull(serdeInfo.get(HiveDataModelGenerator.PARAMETERS)); + Assert.assertEquals( + ((Map<String, String>) serdeInfo.get(HiveDataModelGenerator.PARAMETERS)) + .get("serialization.format"), + "1"); + } + }); /** @@ -807,7 +870,7 @@ public class HiveHookIT { * query = "alter table " + tableName + " STORED AS " + testFormat.toUpperCase(); * runCommand(query); - * tableRef = dgiCLient.getEntity(tableId); + * tableRef = atlasClient.getEntity(tableId); * sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC); * Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_INPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"); * Assert.assertEquals(sdRef.get(HiveDataModelGenerator.STORAGE_DESC_OUTPUT_FMT), "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"); @@ -818,31 +881,37 @@ public class HiveHookIT { @Test public void testAlterTableBucketingClusterSort() throws Exception { String tableName = createTable(); - ImmutableList<String> cols = ImmutableList.<String>of("id"); + ImmutableList<String> cols = ImmutableList.of("id"); runBucketSortQuery(tableName, 5, cols, cols); - cols = ImmutableList.<String>of("id", "name"); + cols = ImmutableList.of("id", "name"); runBucketSortQuery(tableName, 2, cols, cols); } - private void runBucketSortQuery(String tableName, int numBuckets, ImmutableList<String> bucketCols,ImmutableList<String> sortCols) throws Exception { + private void runBucketSortQuery(String tableName, final int numBuckets, final ImmutableList<String> bucketCols, + final ImmutableList<String> sortCols) throws Exception { final String fmtQuery = "alter table %s CLUSTERED BY (%s) SORTED BY (%s) INTO %s BUCKETS"; - String query = String.format(fmtQuery, tableName, stripListBrackets(bucketCols.toString()), stripListBrackets(sortCols.toString()), numBuckets); + String query = String.format(fmtQuery, tableName, stripListBrackets(bucketCols.toString()), + stripListBrackets(sortCols.toString()), numBuckets); runCommand(query); - verifyBucketSortingProperties(tableName, numBuckets, bucketCols, sortCols); + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable entity) throws Exception { + verifyBucketSortingProperties(entity, numBuckets, bucketCols, sortCols); + } + }); } private String stripListBrackets(String listElements) { return StringUtils.strip(StringUtils.strip(listElements, "["), "]"); } - private void verifyBucketSortingProperties(String tableName, int numBuckets, ImmutableList<String> bucketColNames, ImmutableList<String> sortcolNames) throws Exception { - - String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - - Referenceable tableRef = dgiCLient.getEntity(tableId); - Referenceable sdRef = (Referenceable)tableRef.get(HiveDataModelGenerator.STORAGE_DESC); - Assert.assertEquals(((scala.math.BigInt) sdRef.get(HiveDataModelGenerator.STORAGE_NUM_BUCKETS)).intValue(), numBuckets); + private void verifyBucketSortingProperties(Referenceable tableRef, int numBuckets, + ImmutableList<String> bucketColNames, + ImmutableList<String> sortcolNames) throws Exception { + Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC); + Assert.assertEquals(((scala.math.BigInt) sdRef.get(HiveDataModelGenerator.STORAGE_NUM_BUCKETS)).intValue(), + numBuckets); Assert.assertEquals(sdRef.get("bucketCols"), bucketColNames); List<Struct> hiveOrderStructList = (List<Struct>) sdRef.get("sortCols"); @@ -851,7 +920,7 @@ public class HiveHookIT { for (int i = 0; i < sortcolNames.size(); i++) { Assert.assertEquals(hiveOrderStructList.get(i).get("col"), sortcolNames.get(i)); - Assert.assertEquals(((scala.math.BigInt)hiveOrderStructList.get(i).get("order")).intValue(), 1); + Assert.assertEquals(((scala.math.BigInt) hiveOrderStructList.get(i).get("order")).intValue(), 1); } } @@ -882,8 +951,12 @@ public class HiveHookIT { final String query = String.format("drop table %s ", tableName); runCommand(query); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "id")); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), "name")); + assertColumnIsNotRegistered(HiveMetaStoreBridge + .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), + "id")); + assertColumnIsNotRegistered(HiveMetaStoreBridge + .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, tableName), + "name")); assertTableIsNotRegistered(DEFAULT_DB, tableName); } @@ -903,8 +976,11 @@ public class HiveHookIT { runCommand(query); //Verify columns are not registered for one of the tables - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id")); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "name")); + assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName( + HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), "id")); + assertColumnIsNotRegistered(HiveMetaStoreBridge + .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableNames[0]), + "name")); for(int i = 0; i < numTables; i++) { assertTableIsNotRegistered(dbName, tableNames[i]); @@ -974,8 +1050,12 @@ public class HiveHookIT { query = String.format("drop view %s ", viewName); runCommand(query); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "id")); - assertColumnIsNotRegistered(HiveMetaStoreBridge.getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), "name")); + assertColumnIsNotRegistered(HiveMetaStoreBridge + .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), + "id")); + assertColumnIsNotRegistered(HiveMetaStoreBridge + .getColumnQualifiedName(HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, viewName), + "name")); assertTableIsNotRegistered(DEFAULT_DB, viewName); } @@ -1006,16 +1086,20 @@ public class HiveHookIT { @Test public void testAlterDBOwner() throws Exception { String dbName = createDatabase(); + assertDatabaseIsRegistered(dbName); + final String owner = "testOwner"; - String dbId = assertDatabaseIsRegistered(dbName); final String fmtQuery = "alter database %s set OWNER %s %s"; String query = String.format(fmtQuery, dbName, "USER", owner); runCommand(query); - assertDatabaseIsRegistered(dbName); - Referenceable entity = dgiCLient.getEntity(dbId); - Assert.assertEquals(entity.get(HiveDataModelGenerator.OWNER), owner); + assertDatabaseIsRegistered(dbName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable entity) { + assertEquals(entity.get(HiveDataModelGenerator.OWNER), owner); + } + }); } @Test @@ -1073,30 +1157,38 @@ public class HiveHookIT { testAlterProperties(Entity.Type.TABLE, viewName, fmtQuery); } - private void verifyEntityProperties(Entity.Type type, String entityName, Map<String, String> expectedProps, boolean checkIfNotExists) throws Exception { - - String entityId = null; - + private void verifyEntityProperties(Entity.Type type, String entityName, final Map<String, String> expectedProps, + final boolean checkIfNotExists) throws Exception { switch(type) { case TABLE: - entityId = assertTableIsRegistered(DEFAULT_DB, entityName); + assertTableIsRegistered(DEFAULT_DB, entityName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable entity) throws Exception { + verifyProperties(entity, expectedProps, checkIfNotExists); + } + }); break; case DATABASE: - entityId = assertDatabaseIsRegistered(entityName); + assertDatabaseIsRegistered(entityName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable entity) throws Exception { + verifyProperties(entity, expectedProps, checkIfNotExists); + } + }); break; } - - Referenceable ref = dgiCLient.getEntity(entityId); - verifyProperties(ref, expectedProps, checkIfNotExists); } - private void verifyTableSdProperties(String tableName, String serdeLib, Map<String, String> expectedProps) throws Exception { - String tableId = assertTableIsRegistered(DEFAULT_DB, tableName); - Referenceable tableRef = dgiCLient.getEntity(tableId); - Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC); - Struct serdeInfo = (Struct) sdRef.get("serdeInfo"); - Assert.assertEquals(serdeInfo.get("serializationLib"), serdeLib); - verifyProperties(serdeInfo, expectedProps, false); + private void verifyTableSdProperties(String tableName, final String serdeLib, final Map<String, String> expectedProps) throws Exception { + assertTableIsRegistered(DEFAULT_DB, tableName, new AssertPredicate() { + @Override + public void assertOnEntity(Referenceable tableRef) throws Exception { + Referenceable sdRef = (Referenceable) tableRef.get(HiveDataModelGenerator.STORAGE_DESC); + Struct serdeInfo = (Struct) sdRef.get("serdeInfo"); + Assert.assertEquals(serdeInfo.get("serializationLib"), serdeLib); + verifyProperties(serdeInfo, expectedProps, false); + } + }); } private void verifyProperties(Struct referenceable, Map<String, String> expectedProps, boolean checkIfNotExists) { @@ -1119,108 +1211,80 @@ public class HiveHookIT { } private String assertProcessIsRegistered(String queryStr) throws Exception { - // String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), - // normalize(queryStr)); - // assertEntityIsRegistered(dslQuery, true); - //todo replace with DSL - String typeName = HiveDataTypes.HIVE_PROCESS.getName(); - String gremlinQuery = - String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", typeName, typeName, - normalize(queryStr)); - return assertEntityIsRegistered(gremlinQuery); + LOG.debug("Searching for process with query {}", queryStr); + return assertEntityIsRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr), null); } private void assertProcessIsNotRegistered(String queryStr) throws Exception { - // String dslQuery = String.format("%s where queryText = \"%s\"", HiveDataTypes.HIVE_PROCESS.getName(), - // normalize(queryStr)); - // assertEntityIsRegistered(dslQuery, true); - //todo replace with DSL - String typeName = HiveDataTypes.HIVE_PROCESS.getName(); - String gremlinQuery = - String.format("g.V.has('__typeName', '%s').has('%s.queryText', \"%s\").toList()", typeName, typeName, - normalize(queryStr)); - assertEntityIsNotRegistered(QUERY_TYPE.GREMLIN, gremlinQuery); - } - - private String normalize(String str) { - if (StringUtils.isEmpty(str)) { - return null; - } - return StringEscapeUtils.escapeJava(str.toLowerCase()); + LOG.debug("Searching for process with query {}", queryStr); + assertEntityIsNotRegistered(HiveDataTypes.HIVE_PROCESS.getName(), AtlasClient.NAME, normalize(queryStr)); } private void assertTableIsNotRegistered(String dbName, String tableName) throws Exception { LOG.debug("Searching for table {}.{}", dbName, tableName); - String query = String.format( - "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t", - HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); - assertEntityIsNotRegistered(QUERY_TYPE.DSL, query); + String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName); + assertEntityIsNotRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.NAME, tableQualifiedName); } private void assertDBIsNotRegistered(String dbName) throws Exception { - LOG.debug("Searching for database {}.{}", dbName); - String query = String.format( - "%s as d where name = '%s' and clusterName = '%s'" + " select d", - HiveDataTypes.HIVE_DB.getName(), dbName.toLowerCase(), CLUSTER_NAME); - assertEntityIsNotRegistered(QUERY_TYPE.DSL, query); + LOG.debug("Searching for database {}", dbName); + String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName); + assertEntityIsNotRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, dbQualifiedName); } private String assertTableIsRegistered(String dbName, String tableName) throws Exception { - LOG.debug("Searching for table {}.{}", dbName, tableName); - String query = String.format( - "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t", - HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); - return assertEntityIsRegistered(query, "t"); + return assertTableIsRegistered(dbName, tableName, null); } - private String getTableEntity(String dbName, String tableName) throws Exception { + private String assertTableIsRegistered(String dbName, String tableName, AssertPredicate assertPredicate) throws Exception { LOG.debug("Searching for table {}.{}", dbName, tableName); - String query = String.format( - "%s as t where tableName = '%s', db where name = '%s' and clusterName = '%s'" + " select t", - HiveDataTypes.HIVE_TABLE.getName(), tableName.toLowerCase(), dbName.toLowerCase(), CLUSTER_NAME); - return assertEntityIsRegistered(query, "t"); + String tableQualifiedName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, dbName, tableName); + return assertEntityIsRegistered(HiveDataTypes.HIVE_TABLE.getName(), AtlasClient.NAME, tableQualifiedName, + assertPredicate); } private String assertDatabaseIsRegistered(String dbName) throws Exception { + return assertDatabaseIsRegistered(dbName, null); + } + + private String assertDatabaseIsRegistered(String dbName, AssertPredicate assertPredicate) throws Exception { LOG.debug("Searching for database {}", dbName); - String query = String.format("%s where name = '%s' and clusterName = '%s'", HiveDataTypes.HIVE_DB.getName(), - dbName.toLowerCase(), CLUSTER_NAME); - return assertEntityIsRegistered(query); + String dbQualifiedName = HiveMetaStoreBridge.getDBQualifiedName(CLUSTER_NAME, dbName); + return assertEntityIsRegistered(HiveDataTypes.HIVE_DB.getName(), AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, + dbQualifiedName, assertPredicate); } - private String assertEntityIsRegistered(final String query, String... arg) throws Exception { - waitFor(60000, new Predicate() { + private String assertEntityIsRegistered(final String typeName, final String property, final String value, + final AssertPredicate assertPredicate) throws Exception { + waitFor(80000, new Predicate() { @Override - public boolean evaluate() throws Exception { - JSONArray results = dgiCLient.search(query); - return results.length() == 1; + public void evaluate() throws Exception { + Referenceable entity = atlasClient.getEntity(typeName, property, value); + assertNotNull(entity); + if(assertPredicate != null) { + assertPredicate.assertOnEntity(entity); + } } }); - - String column = (arg.length > 0) ? arg[0] : "_col_0"; - - JSONArray results = dgiCLient.search(query); - JSONObject row = results.getJSONObject(0); - if (row.has("__guid")) { - return row.getString("__guid"); - } else if (row.has("$id$")) { - return row.getJSONObject("$id$").getString("id"); - } else { - return row.getJSONObject(column).getString("id"); - } + Referenceable entity = atlasClient.getEntity(typeName, property, value); + return entity.getId()._getId(); } - private void assertEntityIsNotRegistered(QUERY_TYPE queryType, String query) throws Exception { - JSONArray results = null; - switch(queryType) { - case DSL : - results = dgiCLient.searchByDSL(query); - break; - case GREMLIN : - results = dgiCLient.searchByGremlin(query); - break; - } - Assert.assertEquals(results.length(), 0); + private void assertEntityIsNotRegistered(final String typeName, final String property, final String value) throws Exception { + waitFor(80000, new Predicate() { + @Override + public void evaluate() throws Exception { + try { + atlasClient.getEntity(typeName, property, value); + } catch (AtlasServiceException e) { + if(e.getStatus() == ClientResponse.Status.NOT_FOUND) { + return; + } + } + fail(String.format("Entity was not supposed to exist for typeName = %s, attributeName = %s, " + + "attributeValue = %s", typeName, property, value)); + } + }); } @Test @@ -1236,13 +1300,13 @@ public class HiveHookIT { String table2Id = assertTableIsRegistered(db2, table2); String datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, db2, table2); - JSONObject response = dgiCLient.getInputGraph(datasetName); + JSONObject response = atlasClient.getInputGraph(datasetName); JSONObject vertices = response.getJSONObject("values").getJSONObject("vertices"); Assert.assertTrue(vertices.has(table1Id)); Assert.assertTrue(vertices.has(table2Id)); datasetName = HiveMetaStoreBridge.getTableQualifiedName(CLUSTER_NAME, DEFAULT_DB, table1); - response = dgiCLient.getOutputGraph(datasetName); + response = atlasClient.getOutputGraph(datasetName); vertices = response.getJSONObject("values").getJSONObject("vertices"); Assert.assertTrue(vertices.has(table1Id)); Assert.assertTrue(vertices.has(table2Id)); @@ -1255,15 +1319,18 @@ public class HiveHookIT { runCommand("show transactions"); } - public interface Predicate { + public interface AssertPredicate { + void assertOnEntity(Referenceable entity) throws Exception; + } + public interface Predicate { /** * Perform a predicate evaluation. * * @return the boolean result of the evaluation. * @throws Exception thrown if the predicate evaluation could not evaluate. */ - boolean evaluate() throws Exception; + void evaluate() throws Exception; } /** @@ -1276,13 +1343,17 @@ public class HiveHookIT { ParamChecker.notNull(predicate, "predicate"); long mustEnd = System.currentTimeMillis() + timeout; - boolean eval; - while (!(eval = predicate.evaluate()) && System.currentTimeMillis() < mustEnd) { - LOG.info("Waiting up to {} msec", mustEnd - System.currentTimeMillis()); - Thread.sleep(100); - } - if (!eval) { - throw new Exception("Waiting timed out after " + timeout + " msec"); + while (true) { + try { + predicate.evaluate(); + return; + } catch(Error | Exception e) { + if (System.currentTimeMillis() >= mustEnd) { + fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e); + } + LOG.debug("Waiting up to " + (mustEnd - System.currentTimeMillis()) + " msec as assertion failed", e); + Thread.sleep(300); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/client/src/main/java/org/apache/atlas/AtlasClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/atlas/AtlasClient.java b/client/src/main/java/org/apache/atlas/AtlasClient.java index cc87706..22a1726 100755 --- a/client/src/main/java/org/apache/atlas/AtlasClient.java +++ b/client/src/main/java/org/apache/atlas/AtlasClient.java @@ -57,6 +57,7 @@ import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED; */ public class AtlasClient { private static final Logger LOG = LoggerFactory.getLogger(AtlasClient.class); + public static final String NAME = "name"; public static final String GUID = "GUID"; public static final String TYPE = "type"; @@ -403,6 +404,7 @@ public class AtlasClient { * @throws AtlasServiceException */ public List<String> createType(String typeAsJson) throws AtlasServiceException { + LOG.debug("Creating type definition: {}", typeAsJson); JSONObject response = callAPI(API.CREATE_TYPE, typeAsJson); return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() { @Override @@ -429,6 +431,7 @@ public class AtlasClient { * @throws AtlasServiceException */ public List<String> updateType(String typeAsJson) throws AtlasServiceException { + LOG.debug("Updating tyep definition: {}", typeAsJson); JSONObject response = callAPI(API.UPDATE_TYPE, typeAsJson); return extractResults(response, AtlasClient.TYPES, new ExtractOperation<String, JSONObject>() { @Override @@ -474,6 +477,7 @@ public class AtlasClient { * @throws AtlasServiceException */ public JSONArray createEntity(JSONArray entities) throws AtlasServiceException { + LOG.debug("Creating entities: {}", entities); JSONObject response = callAPI(API.CREATE_ENTITY, entities.toString()); try { return response.getJSONArray(GUID); @@ -522,6 +526,7 @@ public class AtlasClient { public JSONArray updateEntities(Collection<Referenceable> entities) throws AtlasServiceException { JSONArray entitiesArray = getEntitiesArray(entities); + LOG.debug("Updating entities: {}", entitiesArray); JSONObject response = callAPI(API.UPDATE_ENTITY, entitiesArray.toString()); try { return response.getJSONArray(GUID); @@ -538,6 +543,7 @@ public class AtlasClient { * @param value property value */ public void updateEntityAttribute(final String guid, final String attribute, String value) throws AtlasServiceException { + LOG.debug("Updating entity id: {}, attribute name: {}, attribute value: {}", guid, attribute, value); callAPIWithRetries(API.UPDATE_ENTITY_PARTIAL, value, new ResourceCreator() { @Override public WebResource createResource() { @@ -555,7 +561,7 @@ public class AtlasClient { for (int i = 0; i < getNumberOfRetries(); i++) { WebResource resource = resourceCreator.createResource(); try { - LOG.info("using resource {} for {} times", resource.getURI(), i); + LOG.debug("Using resource {} for {} times", resource.getURI(), i); JSONObject result = callAPIWithResource(api, resource, requestObject); return result; } catch (ClientHandlerException che) { @@ -578,6 +584,7 @@ public class AtlasClient { */ public void updateEntity(String guid, Referenceable entity) throws AtlasServiceException { String entityJson = InstanceSerialization.toJson(entity, true); + LOG.debug("Updating entity id {} with {}", guid, entityJson); callAPI(API.UPDATE_ENTITY_PARTIAL, entityJson, guid); } @@ -904,6 +911,7 @@ public class AtlasClient { clientResponse = resource.accept(JSON_MEDIA_TYPE).type(JSON_MEDIA_TYPE) .method(api.getMethod(), ClientResponse.class, requestObject); + LOG.debug("API {} returned status {}", resource.getURI(), clientResponse.getStatus()); if (clientResponse.getStatus() == api.getExpectedStatus().getStatusCode()) { String responseAsString = clientResponse.getEntity(String.class); try { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/common/src/main/java/org/apache/atlas/ApplicationProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/atlas/ApplicationProperties.java b/common/src/main/java/org/apache/atlas/ApplicationProperties.java index ca72ffd..6a4dca3 100644 --- a/common/src/main/java/org/apache/atlas/ApplicationProperties.java +++ b/common/src/main/java/org/apache/atlas/ApplicationProperties.java @@ -91,4 +91,14 @@ public final class ApplicationProperties extends PropertiesConfiguration { public static Configuration getSubsetConfiguration(Configuration inConf, String prefix) { return inConf.subset(prefix); } + + public static Class getClass(String propertyName, String defaultValue) { + try { + Configuration configuration = get(); + String propertyValue = configuration.getString(propertyName, defaultValue); + return Class.forName(propertyValue); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 64446f4..61c6e8b 100644 --- a/release-log.txt +++ b/release-log.txt @@ -3,6 +3,7 @@ Apache Atlas Release Notes --trunk - unreleased INCOMPATIBLE CHANGES: +ATLAS-622 Introduce soft delete (shwethags) ATLAS-494 UI Authentication (nixonrodrigues via shwethags) ATLAS-621 Introduce entity state in Id object (shwethags) ATLAS-474 Server does not start if the type is updated with same super type class information (dkantor via shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java index 75d14f0..8dae968 100755 --- a/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java +++ b/repository/src/main/java/org/apache/atlas/RepositoryMetadataModule.java @@ -35,9 +35,11 @@ import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.audit.EntityAuditListener; import org.apache.atlas.repository.audit.EntityAuditRepository; import org.apache.atlas.repository.audit.HBaseBasedAuditRepository; +import org.apache.atlas.repository.graph.DeleteHandler; import org.apache.atlas.repository.graph.GraphBackedMetadataRepository; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.repository.graph.SoftDeleteHandler; import org.apache.atlas.repository.graph.TitanGraphProvider; import org.apache.atlas.repository.typestore.GraphBackedTypeStore; import org.apache.atlas.repository.typestore.ITypeStore; @@ -85,6 +87,8 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { bindAuditRepository(binder()); + bind(DeleteHandler.class).to(getDeleteHandler()).asEagerSingleton(); + //Add EntityAuditListener as EntityChangeListener Multibinder<EntityChangeListener> entityChangeListenerBinder = Multibinder.newSetBinder(binder(), EntityChangeListener.class); @@ -103,4 +107,11 @@ public class RepositoryMetadataModule extends com.google.inject.AbstractModule { Multibinder<Service> serviceBinder = Multibinder.newSetBinder(binder, Service.class); serviceBinder.addBinding().to(HBaseBasedAuditRepository.class); } + + private static final String DELETE_HANDLER_IMPLEMENTATION_PROPERTY = "atlas.DeleteHandler.impl"; + + private Class<? extends DeleteHandler> getDeleteHandler() { + return ApplicationProperties.getClass(DELETE_HANDLER_IMPLEMENTATION_PROPERTY, + SoftDeleteHandler.class.getName()); + } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java index 7f77feb..5b4bdbf 100644 --- a/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java +++ b/repository/src/main/java/org/apache/atlas/repository/audit/EntityAuditListener.java @@ -45,7 +45,7 @@ public class EntityAuditListener implements EntityChangeListener { @Override public void onEntitiesAdded(Collection<ITypedReferenceableInstance> entities) throws AtlasException { List<EntityAuditEvent> events = new ArrayList<>(); - long currentTime = System.currentTimeMillis(); + long currentTime = RequestContext.get().getRequestTime(); for (ITypedReferenceableInstance entity : entities) { EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_CREATE, "Created: " + InstanceSerialization.toJson(entity, true)); @@ -62,7 +62,7 @@ public class EntityAuditListener implements EntityChangeListener { @Override public void onEntitiesUpdated(Collection<ITypedReferenceableInstance> entities) throws AtlasException { List<EntityAuditEvent> events = new ArrayList<>(); - long currentTime = System.currentTimeMillis(); + long currentTime = RequestContext.get().getRequestTime(); for (ITypedReferenceableInstance entity : entities) { EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_UPDATE, "Updated: " + InstanceSerialization.toJson(entity, true)); @@ -73,7 +73,7 @@ public class EntityAuditListener implements EntityChangeListener { @Override public void onTraitAdded(ITypedReferenceableInstance entity, IStruct trait) throws AtlasException { - EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(), + EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(), EntityAuditEvent.EntityAuditAction.TAG_ADD, "Added trait: " + InstanceSerialization.toJson(trait, true)); auditRepository.putEvents(event); @@ -81,7 +81,7 @@ public class EntityAuditListener implements EntityChangeListener { @Override public void onTraitDeleted(ITypedReferenceableInstance entity, String traitName) throws AtlasException { - EntityAuditEvent event = createEvent(entity, System.currentTimeMillis(), + EntityAuditEvent event = createEvent(entity, RequestContext.get().getRequestTime(), EntityAuditEvent.EntityAuditAction.TAG_DELETE, "Deleted trait: " + traitName); auditRepository.putEvents(event); } @@ -89,7 +89,7 @@ public class EntityAuditListener implements EntityChangeListener { @Override public void onEntitiesDeleted(Collection<ITypedReferenceableInstance> entities) throws AtlasException { List<EntityAuditEvent> events = new ArrayList<>(); - long currentTime = System.currentTimeMillis(); + long currentTime = RequestContext.get().getRequestTime(); for (ITypedReferenceableInstance entity : entities) { EntityAuditEvent event = createEvent(entity, currentTime, EntityAuditEvent.EntityAuditAction.ENTITY_DELETE, "Deleted entity"); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java b/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java index da2ad9a..d905c01 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/AtlasEdgeLabel.java @@ -31,9 +31,6 @@ public class AtlasEdgeLabel { private final String qualifiedAttributeName_; public AtlasEdgeLabel(String edgeLabel) { - if (!edgeLabel.startsWith(GraphHelper.EDGE_LABEL_PREFIX)) { - throw new IllegalArgumentException("Invalid edge label " + edgeLabel + ": missing required prefix " + GraphHelper.EDGE_LABEL_PREFIX); - } String labelWithoutPrefix = edgeLabel.substring(GraphHelper.EDGE_LABEL_PREFIX.length()); String[] fields = labelWithoutPrefix.split("\\.", 3); if (fields.length < 2 || fields.length > 3) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/40ee9492/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java new file mode 100644 index 0000000..369a5d5 --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/graph/DeleteHandler.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository.graph; + +import com.tinkerpop.blueprints.Direction; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; +import org.apache.atlas.AtlasException; +import org.apache.atlas.RequestContext; +import org.apache.atlas.repository.Constants; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.AttributeInfo; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.FieldMapping; +import org.apache.atlas.typesystem.types.HierarchicalType; +import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.StructType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static org.apache.atlas.repository.graph.GraphHelper.EDGE_LABEL_PREFIX; +import static org.apache.atlas.repository.graph.GraphHelper.string; + +public abstract class DeleteHandler { + public static final Logger LOG = LoggerFactory.getLogger(DeleteHandler.class); + + private static final GraphHelper graphHelper = GraphHelper.getInstance(); + + protected TypeSystem typeSystem; + private boolean shouldUpdateReverseAttribute; + + public DeleteHandler(TypeSystem typeSystem, boolean shouldUpdateReverseAttribute) { + this.typeSystem = typeSystem; + this.shouldUpdateReverseAttribute = shouldUpdateReverseAttribute; + + } + + /** + * Deletes the entity vertex - deletes the traits and all the references + * @param instanceVertex + * @throws AtlasException + */ + public void deleteEntity(Vertex instanceVertex) throws AtlasException { + String guid = GraphHelper.getIdFromVertex(instanceVertex); + String typeName = GraphHelper.getTypeName(instanceVertex); + RequestContext.get().recordDeletedEntity(guid, typeName); + + deleteAllTraits(instanceVertex); + + deleteTypeVertex(instanceVertex); + } + + protected abstract void deleteEdge(Edge edge) throws AtlasException; + + /** + * Deletes a type vertex - can be entity(class type) or just vertex(struct/trait type) + * @param instanceVertex + * @param typeCategory + * @throws AtlasException + */ + protected void deleteTypeVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory) throws AtlasException { + switch (typeCategory) { + case STRUCT: + case TRAIT: + deleteTypeVertex(instanceVertex); + break; + + case CLASS: + deleteEntity(instanceVertex); + break; + + default: + throw new IllegalStateException("Type category " + typeCategory + " not handled"); + } + } + + /** + * Deleting any type vertex. Goes over the complex attributes and removes the references + * @param instanceVertex + * @throws AtlasException + */ + protected void deleteTypeVertex(Vertex instanceVertex) throws AtlasException { + LOG.debug("Deleting {}", string(instanceVertex)); + String typeName = GraphHelper.getTypeName(instanceVertex); + IDataType type = typeSystem.getDataType(IDataType.class, typeName); + FieldMapping fieldMapping = getFieldMapping(type); + + for (AttributeInfo attributeInfo : fieldMapping.fields.values()) { + LOG.debug("Deleting attribute {} for {}", attributeInfo.name, string(instanceVertex)); + String edgeLabel = GraphHelper.getEdgeLabel(type, attributeInfo); + + switch (attributeInfo.dataType().getTypeCategory()) { + case CLASS: + //If its class attribute, delete the reference + deleteReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.CLASS, attributeInfo.isComposite); + break; + + case STRUCT: + //If its struct attribute, delete the reference + deleteReference(instanceVertex, edgeLabel, DataTypes.TypeCategory.STRUCT); + break; + + case ARRAY: + //For array attribute, if the element is struct/class, delete all the references + IDataType elementType = ((DataTypes.ArrayType) attributeInfo.dataType()).getElemType(); + DataTypes.TypeCategory elementTypeCategory = elementType.getTypeCategory(); + if (elementTypeCategory == DataTypes.TypeCategory.STRUCT || + elementTypeCategory == DataTypes.TypeCategory.CLASS) { + Iterator<Edge> edges = GraphHelper.getOutGoingEdgesByLabel(instanceVertex, edgeLabel); + if (edges != null) { + while (edges.hasNext()) { + Edge edge = edges.next(); + deleteReference(edge, elementType, attributeInfo); + } + } + } + break; + + case MAP: + //For map attribute, if the value type is struct/class, delete all the references + DataTypes.MapType mapType = (DataTypes.MapType) attributeInfo.dataType(); + DataTypes.TypeCategory valueTypeCategory = mapType.getValueType().getTypeCategory(); + String propertyName = GraphHelper.getQualifiedFieldName(type, attributeInfo.name); + + if (valueTypeCategory == DataTypes.TypeCategory.STRUCT || + valueTypeCategory == DataTypes.TypeCategory.CLASS) { + List<String> keys = instanceVertex.getProperty(propertyName); + if (keys != null) { + for (String key : keys) { + String mapEdgeLabel = GraphHelper.getQualifiedNameForMapKey(edgeLabel, key); + deleteReference(instanceVertex, mapEdgeLabel, valueTypeCategory, attributeInfo.isComposite); + } + } + } + } + } + + deleteVertex(instanceVertex, type.getTypeCategory()); + } + + public void deleteReference(Edge edge, IDataType dataType, AttributeInfo attributeInfo) throws AtlasException { + deleteReference(edge, dataType.getTypeCategory(), attributeInfo.isComposite); + } + + public void deleteReference(Edge edge, DataTypes.TypeCategory typeCategory, boolean isComposite) throws AtlasException { + LOG.debug("Deleting {}", string(edge)); + if (typeCategory == DataTypes.TypeCategory.STRUCT || typeCategory == DataTypes.TypeCategory.TRAIT + || (typeCategory == DataTypes.TypeCategory.CLASS && isComposite)) { + //If the vertex is of type struct/trait, delete the edge and then the reference vertex as the vertex is not shared by any other entities. + //If the vertex is of type class, and its composite attribute, this reference vertex' lifecycle is controlled + //through this delete, hence delete the edge and the reference vertex. + Vertex vertexForDelete = edge.getVertex(Direction.IN); + + //If deleting the edge and then the in vertex, reverse attribute shouldn't be updated + deleteEdge(edge, false); + deleteTypeVertex(vertexForDelete, typeCategory); + } else { + //If the vertex is of type class, and its not a composite attributes, the reference vertex' lifecycle is not controlled + //through this delete. Hence just remove the reference edge. Leave the reference vertex as is + + //If deleting just the edge, reverse attribute should be updated for any references + //For example, for the department type system, if the person's manager edge is deleted, subordinates of manager should be updated + deleteEdge(edge, true); + } + } + + public void deleteReference(Vertex instanceVertex, String edgeLabel, DataTypes.TypeCategory typeCategory) + throws AtlasException { + deleteReference(instanceVertex, edgeLabel, typeCategory, false); + } + + public void deleteReference(Vertex instanceVertex, String edgeLabel, DataTypes.TypeCategory typeCategory, + boolean isComposite) throws AtlasException { + Edge edge = GraphHelper.getEdgeForLabel(instanceVertex, edgeLabel); + if (edge != null) { + deleteReference(edge, typeCategory, isComposite); + } + } + + protected void deleteEdge(Edge edge, boolean updateReverseAttribute) throws AtlasException { + //update reverse attribute + if (updateReverseAttribute) { + AttributeInfo attributeInfo = getAttributeForEdge(edge.getLabel()); + if (attributeInfo.reverseAttributeName != null) { + deleteEdgeBetweenVertices(edge.getVertex(Direction.IN), edge.getVertex(Direction.OUT), + attributeInfo.reverseAttributeName); + } + } + + deleteEdge(edge); + } + + protected void deleteVertex(Vertex instanceVertex, DataTypes.TypeCategory typeCategory) throws AtlasException { + //Update external references(incoming edges) to this vertex + LOG.debug("Setting the external references to {} to null(removing edges)", string(instanceVertex)); + Iterator<Edge> edges = instanceVertex.getEdges(Direction.IN).iterator(); + + while(edges.hasNext()) { + Edge edge = edges.next(); + String edgeState = edge.getProperty(Constants.STATE_PROPERTY_KEY); + if (Id.EntityState.ACTIVE.name().equals(edgeState)) { + //Delete only the active edge references + AttributeInfo attribute = getAttributeForEdge(edge.getLabel()); + deleteEdgeBetweenVertices(edge.getVertex(Direction.OUT), edge.getVertex(Direction.IN), attribute.name); + deleteEdge(edge); + } + } + _deleteVertex(instanceVertex); + } + + protected abstract void _deleteVertex(Vertex instanceVertex); + + /** + * Deletes the edge between outvertex and inVertex. The edge is for attribute attributeName of outVertex + * @param outVertex + * @param inVertex + * @param attributeName + * @throws AtlasException + */ + protected void deleteEdgeBetweenVertices(Vertex outVertex, Vertex inVertex, String attributeName) throws AtlasException { + LOG.debug("Removing edge from {} to {} with attribute name {}", string(outVertex), string(inVertex), + attributeName); + String typeName = GraphHelper.getTypeName(outVertex); + String outId = GraphHelper.getIdFromVertex(outVertex); + if (outId != null && RequestContext.get().getDeletedEntityIds().contains(outId)) { + //If the reference vertex is marked for deletion, skip updating the reference + return; + } + + IDataType type = typeSystem.getDataType(IDataType.class, typeName); + AttributeInfo attributeInfo = getFieldMapping(type).fields.get(attributeName); + String propertyName = GraphHelper.getQualifiedFieldName(type, attributeName); + String edgeLabel = EDGE_LABEL_PREFIX + propertyName; + Edge edge = null; + + switch (attributeInfo.dataType().getTypeCategory()) { + case CLASS: + //If its class attribute, its the only edge between two vertices + //TODO need to enable this + // if (refAttributeInfo.multiplicity == Multiplicity.REQUIRED) { + // throw new AtlasException("Can't set attribute " + refAttributeName + " to null as its required attribute"); + // } + edge = GraphHelper.getEdgeForLabel(outVertex, edgeLabel); + break; + + case ARRAY: + //If its array attribute, find the right edge between the two vertices and update array property + List<String> elements = outVertex.getProperty(propertyName); + if (elements != null) { + elements = new ArrayList<>(elements); //Make a copy, else list.remove reflects on titan.getProperty() + for (String elementEdgeId : elements) { + Edge elementEdge = graphHelper.getEdgeById(elementEdgeId); + if (elementEdge == null) { + continue; + } + + Vertex elementVertex = elementEdge.getVertex(Direction.IN); + if (elementVertex.getId().toString().equals(inVertex.getId().toString())) { + edge = elementEdge; + + if (shouldUpdateReverseAttribute || attributeInfo.isComposite) { + //if composite attribute, remove the reference as well. else, just remove the edge + //for example, when table is deleted, process still references the table + //but when column is deleted, table will not reference the deleted column + LOG.debug("Removing edge {} from the array attribute {}", string(elementEdge), + attributeName); + elements.remove(elementEdge.getId().toString()); + GraphHelper.setProperty(outVertex, propertyName, elements); + } + break; + } + } + } + break; + + case MAP: + //If its map attribute, find the right edge between two vertices and update map property + List<String> keys = outVertex.getProperty(propertyName); + if (keys != null) { + keys = new ArrayList<>(keys); //Make a copy, else list.remove reflects on titan.getProperty() + for (String key : keys) { + String keyPropertyName = propertyName + "." + key; + String mapEdgeId = outVertex.getProperty(keyPropertyName); + Edge mapEdge = graphHelper.getEdgeById(mapEdgeId); + Vertex mapVertex = mapEdge.getVertex(Direction.IN); + if (mapVertex.getId().toString().equals(inVertex.getId().toString())) { + edge = mapEdge; + + if (shouldUpdateReverseAttribute || attributeInfo.isComposite) { + //remove this key + LOG.debug("Removing edge {}, key {} from the map attribute {}", string(mapEdge), key, + attributeName); + keys.remove(key); + GraphHelper.setProperty(outVertex, propertyName, keys); + GraphHelper.setProperty(outVertex, keyPropertyName, null); + } + break; + } + } + } + break; + + case STRUCT: + case TRAIT: + break; + + default: + throw new IllegalStateException("There can't be an edge from " + string(outVertex) + " to " + + string(inVertex) + " with attribute name " + attributeName + " which is not class/array/map attribute"); + } + + if (edge != null) { + deleteEdge(edge); + GraphHelper.setProperty(outVertex, Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY, + RequestContext.get().getRequestTime()); + } + } + + protected AttributeInfo getAttributeForEdge(String edgLabel) throws AtlasException { + AtlasEdgeLabel atlasEdgeLabel = new AtlasEdgeLabel(edgLabel); + IDataType referenceType = typeSystem.getDataType(IDataType.class, atlasEdgeLabel.getTypeName()); + return getFieldMapping(referenceType).fields.get(atlasEdgeLabel.getAttributeName()); + } + + protected FieldMapping getFieldMapping(IDataType type) { + switch (type.getTypeCategory()) { + case CLASS: + case TRAIT: + return ((HierarchicalType)type).fieldMapping(); + + case STRUCT: + return ((StructType)type).fieldMapping(); + + default: + throw new IllegalStateException("Type " + type + " doesn't have any fields!"); + } + } + + /** + * Delete all traits from the specified vertex. + * @param instanceVertex + * @throws AtlasException + */ + private void deleteAllTraits(Vertex instanceVertex) throws AtlasException { + List<String> traitNames = GraphHelper.getTraitNames(instanceVertex); + LOG.debug("Deleting traits {} for {}", traitNames, string(instanceVertex)); + String typeName = GraphHelper.getTypeName(instanceVertex); + + for (String traitNameToBeDeleted : traitNames) { + String relationshipLabel = GraphHelper.getTraitLabel(typeName, traitNameToBeDeleted); + deleteReference(instanceVertex, relationshipLabel, DataTypes.TypeCategory.TRAIT); + } + } +}
