Repository: incubator-atlas Updated Branches: refs/heads/master 08f569039 -> a96424a1a
ATLAS-1207 Dataset exists query in lineage APIs takes longer (shwethags) Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a96424a1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a96424a1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a96424a1 Branch: refs/heads/master Commit: a96424a1ad566b8d13c18bd1d1694ffd0d33844c Parents: 08f5690 Author: Shwetha GS <[email protected]> Authored: Fri Oct 21 11:37:37 2016 +0530 Committer: Shwetha GS <[email protected]> Committed: Fri Oct 21 11:37:37 2016 +0530 ---------------------------------------------------------------------- release-log.txt | 1 + .../atlas/discovery/DataSetLineageService.java | 61 ++++++++++--------- .../org/apache/atlas/BaseRepositoryTest.java | 18 +++--- .../discovery/DataSetLineageServiceTest.java | 64 ++++++++++++-------- 4 files changed, 78 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 88af9fc..56ef736 100644 --- a/release-log.txt +++ b/release-log.txt @@ -9,6 +9,7 @@ ATLAS-1060 Add composite indexes for exact match performance improvements for al ATLAS-1127 Modify creation and modification timestamps to Date instead of Long(sumasai) ALL CHANGES: +ATLAS-1207 Dataset exists query in lineage APIs takes longer (shwethags) ATLAS-1232 added preCreate(), preDelete() in typedef persistence, to enable edge creation for references in a later stage (mneethiraj) ATLAS-1183 UI: help link should point to atlas website (kevalbhatt via shwethags) ATLAS-1182 Hive Column level lineage docs (svimal2106 via shwethags) http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java index 4359264..c3fd72b 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/DataSetLineageService.java @@ -18,9 +18,6 @@ package org.apache.atlas.discovery; -import javax.inject.Inject; -import javax.inject.Singleton; - import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; @@ -28,25 +25,31 @@ import org.apache.atlas.AtlasProperties; import org.apache.atlas.GraphTransaction; import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy; import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; -import org.apache.atlas.query.GremlinQueryResult; import org.apache.atlas.query.InputLineageClosureQuery; import org.apache.atlas.query.OutputLineageClosureQuery; import org.apache.atlas.query.QueryParams; +import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graph.GraphHelper; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; import org.apache.atlas.typesystem.exception.EntityNotFoundException; import org.apache.atlas.typesystem.exception.SchemaNotFoundException; -import org.apache.atlas.typesystem.persistence.ReferenceableInstance; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.TypeUtils; import org.apache.atlas.utils.ParamChecker; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.Option; import scala.Some; import scala.collection.immutable.List; +import javax.inject.Inject; +import javax.inject.Singleton; +import java.util.Iterator; + /** * Hive implementation of Lineage service interface. */ @@ -66,10 +69,6 @@ public class DataSetLineageService implements LineageService { private static final String HIVE_PROCESS_INPUT_ATTRIBUTE_NAME = "inputs"; private static final String HIVE_PROCESS_OUTPUT_ATTRIBUTE_NAME = "outputs"; - private static final String DATASET_EXISTS_QUERY = AtlasClient.DATA_SET_SUPER_TYPE + " where __guid = '%s'"; - private static final String DATASET_NAME_EXISTS_QUERY = - AtlasClient.DATA_SET_SUPER_TYPE + " where " + AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME + "='%s' and __state = 'ACTIVE'"; - private static final Configuration propertiesConf; static { @@ -104,8 +103,8 @@ public class DataSetLineageService implements LineageService { public String getOutputsGraph(String datasetName) throws AtlasException { LOG.info("Fetching lineage outputs graph for datasetName={}", datasetName); datasetName = ParamChecker.notEmpty(datasetName, "dataset name"); - ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName); - return getOutputsGraphForId(datasetInstance.getId()._getId()); + TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName); + return getOutputsGraphForId(typeIdPair.right); } /** @@ -119,8 +118,8 @@ public class DataSetLineageService implements LineageService { public String getInputsGraph(String tableName) throws AtlasException { LOG.info("Fetching lineage inputs graph for tableName={}", tableName); tableName = ParamChecker.notEmpty(tableName, "table name"); - ReferenceableInstance datasetInstance = validateDatasetNameExists(tableName); - return getInputsGraphForId(datasetInstance.getId()._getId()); + TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(tableName); + return getInputsGraphForId(typeIdPair.right); } @Override @@ -169,9 +168,9 @@ public class DataSetLineageService implements LineageService { public String getSchema(String datasetName) throws AtlasException { datasetName = ParamChecker.notEmpty(datasetName, "table name"); LOG.info("Fetching schema for tableName={}", datasetName); - ReferenceableInstance datasetInstance = validateDatasetNameExists(datasetName); + TypeUtils.Pair<String, String> typeIdPair = validateDatasetNameExists(datasetName); - return getSchemaForId(datasetInstance.getTypeName(), datasetInstance.getId()._getId()); + return getSchemaForId(typeIdPair.left, typeIdPair.right); } private String getSchemaForId(String typeName, String guid) throws DiscoveryException, SchemaNotFoundException { @@ -199,14 +198,16 @@ public class DataSetLineageService implements LineageService { * * @param datasetName table name */ - private ReferenceableInstance validateDatasetNameExists(String datasetName) throws AtlasException { - final String tableExistsQuery = String.format(DATASET_NAME_EXISTS_QUERY, datasetName); - GremlinQueryResult queryResult = discoveryService.evaluate(tableExistsQuery, new QueryParams(1, 0)); - if (!(queryResult.rows().length() > 0)) { - throw new EntityNotFoundException(datasetName + " does not exist"); + private TypeUtils.Pair<String, String> validateDatasetNameExists(String datasetName) throws AtlasException { + Iterator<AtlasVertex> results = graph.query().has("Referenceable.qualifiedName", datasetName) + .has(Constants.STATE_PROPERTY_KEY, Id.EntityState.ACTIVE.name()) + .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE) + .vertices().iterator(); + while (results.hasNext()) { + AtlasVertex vertex = results.next(); + return TypeUtils.Pair.of(GraphHelper.getTypeName(vertex), GraphHelper.getIdFromVertex(vertex)); } - - return (ReferenceableInstance)queryResult.rows().apply(0); + throw new EntityNotFoundException("Dataset with name = " + datasetName + " does not exist"); } /** @@ -215,13 +216,13 @@ public class DataSetLineageService implements LineageService { * @param guid entity id */ private String validateDatasetExists(String guid) throws AtlasException { - final String datasetExistsQuery = String.format(DATASET_EXISTS_QUERY, guid); - GremlinQueryResult queryResult = discoveryService.evaluate(datasetExistsQuery, new QueryParams(1, 0)); - if (!(queryResult.rows().length() > 0)) { - throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist"); + Iterator<AtlasVertex> results = graph.query().has(Constants.GUID_PROPERTY_KEY, guid) + .has(Constants.SUPER_TYPES_PROPERTY_KEY, AtlasClient.DATA_SET_SUPER_TYPE) + .vertices().iterator(); + while (results.hasNext()) { + AtlasVertex vertex = results.next(); + return GraphHelper.getTypeName(vertex); } - - ReferenceableInstance referenceable = (ReferenceableInstance)queryResult.rows().apply(0); - return referenceable.getTypeName(); + throw new EntityNotFoundException("Dataset with guid = " + guid + " does not exist"); } } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java index d7068cd..71a8756 100644 --- a/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java +++ b/repository/src/test/java/org/apache/atlas/BaseRepositoryTest.java @@ -17,12 +17,9 @@ */ package org.apache.atlas; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import javax.inject.Inject; - +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.atlas.repository.MetadataRepository; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graph.GraphBackedSearchIndexer; @@ -45,9 +42,10 @@ import org.apache.atlas.typesystem.types.TypeSystem; import org.apache.atlas.typesystem.types.utils.TypesUtil; import org.testng.annotations.Guice; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; /** * Base Class to set up hive types and instances for tests @@ -319,7 +317,7 @@ public class BaseRepositoryTest { List<Referenceable> columns, String... traitNames) throws Exception { Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); referenceable.set("name", name); - referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name); + referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, "qualified:" + name); referenceable.set("description", description); referenceable.set("owner", owner); referenceable.set("tableType", tableType); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a96424a1/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java index b675459..a0ee26c 100644 --- a/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java +++ b/repository/src/test/java/org/apache/atlas/discovery/DataSetLineageServiceTest.java @@ -34,6 +34,7 @@ import org.apache.atlas.typesystem.persistence.Id; import org.apache.commons.collections.ArrayStack; import org.apache.commons.lang.RandomStringUtils; import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -156,14 +157,14 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { testInvalidArguments(expectedException, new Invoker() { @Override void run() throws AtlasException { - lineageService.getInputsGraphForEntity(tableName); + lineageService.getInputsGraph(tableName); } }); } @Test public void testGetInputsGraph() throws Exception { - JSONObject results = new JSONObject(lineageService.getInputsGraph("sales_fact_monthly_mv")); + JSONObject results = getInputsGraph("sales_fact_monthly_mv"); assertNotNull(results); System.out.println("inputs graph = " + results); @@ -179,7 +180,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { @Test public void testCircularLineage() throws Exception{ - JSONObject results = new JSONObject(lineageService.getInputsGraph("table2")); + JSONObject results = getInputsGraph("table2"); assertNotNull(results); System.out.println("inputs graph = " + results); @@ -223,19 +224,19 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { } @Test(dataProvider = "invalidArgumentsProvider") - public void testGetOutputsGraphForEntityInvalidArguments(final String tableName, String expectedException) + public void testGetOutputsGraphForEntityInvalidArguments(final String tableId, String expectedException) throws Exception { testInvalidArguments(expectedException, new Invoker() { @Override void run() throws AtlasException { - lineageService.getOutputsGraphForEntity(tableName); + lineageService.getOutputsGraphForEntity(tableId); } }); } @Test public void testGetOutputsGraph() throws Exception { - JSONObject results = new JSONObject(lineageService.getOutputsGraph("sales_fact")); + JSONObject results = getOutputsGraph("sales_fact"); assertNotNull(results); System.out.println("outputs graph = " + results); @@ -276,7 +277,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { @Test(dataProvider = "tableNamesProvider") public void testGetSchema(String tableName, String expected) throws Exception { - JSONObject results = new JSONObject(lineageService.getSchema(tableName)); + JSONObject results = getSchema(tableName); assertNotNull(results); System.out.println("columns = " + results); @@ -284,11 +285,7 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { Assert.assertEquals(rows.length(), Integer.parseInt(expected)); for (int index = 0; index < rows.length(); index++) { - final JSONObject row = rows.getJSONObject(index); - assertNotNull(row.getString("name")); - assertNotNull(row.getString("comment")); - assertNotNull(row.getString("dataType")); - Assert.assertEquals(row.getString("$typeName$"), "hive_column"); + assertColumn(rows.getJSONObject(index)); } } @@ -305,14 +302,17 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { Assert.assertEquals(rows.length(), Integer.parseInt(expected)); for (int index = 0; index < rows.length(); index++) { - final JSONObject row = rows.getJSONObject(index); - assertNotNull(row.getString("name")); - assertNotNull(row.getString("comment")); - assertNotNull(row.getString("dataType")); - Assert.assertEquals(row.getString("$typeName$"), "hive_column"); + assertColumn(rows.getJSONObject(index)); } } + private void assertColumn(JSONObject jsonObject) throws JSONException { + assertNotNull(jsonObject.getString("name")); + assertNotNull(jsonObject.getString("comment")); + assertNotNull(jsonObject.getString("dataType")); + Assert.assertEquals(jsonObject.getString("$typeName$"), "hive_column"); + } + @Test(expectedExceptions = SchemaNotFoundException.class) public void testGetSchemaForDBEntity() throws Exception { String dbId = getEntityId(DATASET_SUBTYPE, "name", "dataSetSubTypeInst1"); @@ -359,23 +359,35 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { }); } + private JSONObject getSchema(String tableName) throws Exception { + return new JSONObject(lineageService.getSchema("qualified:" + tableName)); + } + + private JSONObject getInputsGraph(String tableName) throws Exception { + return new JSONObject(lineageService.getInputsGraph("qualified:" + tableName)); + } + + private JSONObject getOutputsGraph(String tableName) throws Exception { + return new JSONObject(lineageService.getOutputsGraph("qualified:" + tableName)); + } + @Test public void testLineageWithDelete() throws Exception { String tableName = "table" + random(); createTable(tableName, 3, true); String tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName); - JSONObject results = new JSONObject(lineageService.getSchema(tableName)); + JSONObject results = getSchema(tableName); assertEquals(results.getJSONArray("rows").length(), 3); - results = new JSONObject(lineageService.getInputsGraph(tableName)); + results = getInputsGraph(tableName); Struct resultInstance = InstanceSerialization.fromJsonStruct(results.toString(), true); Map<String, Struct> vertices = (Map) resultInstance.get("vertices"); assertEquals(vertices.size(), 2); Struct vertex = vertices.get(tableId); assertEquals(((Struct) vertex.get("vertexId")).get("state"), Id.EntityState.ACTIVE.name()); - results = new JSONObject(lineageService.getOutputsGraph(tableName)); + results = getOutputsGraph(tableName); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); results = new JSONObject(lineageService.getSchemaForEntity(tableId)); @@ -408,21 +420,21 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 2); try { - lineageService.getSchema(tableName); + getSchema(tableName); fail("Expected EntityNotFoundException"); } catch (EntityNotFoundException e) { //expected } try { - lineageService.getInputsGraph(tableName); + getInputsGraph(tableName); fail("Expected EntityNotFoundException"); } catch (EntityNotFoundException e) { //expected } try { - lineageService.getOutputsGraph(tableName); + getOutputsGraph(tableName); fail("Expected EntityNotFoundException"); } catch (EntityNotFoundException e) { //expected @@ -430,13 +442,13 @@ public class DataSetLineageServiceTest extends BaseRepositoryTest { //Create table again should show new lineage createTable(tableName, 2, false); - results = new JSONObject(lineageService.getSchema(tableName)); + results = getSchema(tableName); assertEquals(results.getJSONArray("rows").length(), 2); - results = new JSONObject(lineageService.getOutputsGraph(tableName)); + results = getOutputsGraph(tableName); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); - results = new JSONObject(lineageService.getInputsGraph(tableName)); + results = getInputsGraph(tableName); assertEquals(results.getJSONObject("values").getJSONObject("vertices").length(), 0); tableId = getEntityId(HIVE_TABLE_TYPE, "name", tableName);
