http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/hadoop/metadata/query/Resolver.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/hadoop/metadata/query/Resolver.scala b/repository/src/main/scala/org/apache/hadoop/metadata/query/Resolver.scala deleted file mode 100755 index bd98d30..0000000 --- a/repository/src/main/scala/org/apache/hadoop/metadata/query/Resolver.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.query - -import org.apache.hadoop.metadata.query.Expressions._ -import org.apache.hadoop.metadata.typesystem.types.IDataType - -class Resolver(srcExpr: Option[Expression] = None, aliases: Map[String, Expression] = Map(), - connectClassExprToSrc: Boolean = false) - extends PartialFunction[Expression, Expression] { - - import org.apache.hadoop.metadata.query.TypeUtils._ - - def isDefinedAt(x: Expression) = true - - def apply(e: Expression): Expression = e match { - case idE@IdExpression(name) => { - val backExpr = aliases.get(name) - if (backExpr.isDefined) { - return new BackReference(name, backExpr.get, None) - } - if (srcExpr.isDefined) { - val fInfo = resolveReference(srcExpr.get.dataType, name) - if (fInfo.isDefined) { - return new FieldExpression(name, fInfo.get, None) - } - } - val cType = resolveAsClassType(name) - if (cType.isDefined) { - return new ClassExpression(name) - } - val tType = resolveAsTraitType(name) - if (tType.isDefined) { - return new TraitExpression(name) - } - idE - } - case ce@ClassExpression(clsName) if connectClassExprToSrc && srcExpr.isDefined => { - val fInfo = resolveReference(srcExpr.get.dataType, clsName) - if (fInfo.isDefined) { - return new FieldExpression(clsName, fInfo.get, None) - } - ce - } - case f@UnresolvedFieldExpression(child, fieldName) if child.resolved => { - var fInfo: Option[FieldInfo] = None - - fInfo = resolveReference(child.dataType, fieldName) - if (fInfo.isDefined) { - return new FieldExpression(fieldName, fInfo.get, Some(child)) - } - val tType = resolveAsTraitType(fieldName) - if (tType.isDefined) { - return new FieldExpression(fieldName, FieldInfo(child.dataType, null, null, fieldName), Some(child)) - } - f - } - case isTraitLeafExpression(traitName, classExpression) - if srcExpr.isDefined && !classExpression.isDefined => - isTraitLeafExpression(traitName, srcExpr) - case hasFieldLeafExpression(traitName, classExpression) - if srcExpr.isDefined && !classExpression.isDefined => - hasFieldLeafExpression(traitName, srcExpr) - case f@FilterExpression(inputExpr, condExpr) if inputExpr.resolved => { - val r = new Resolver(Some(inputExpr), inputExpr.namedExpressions) - return new FilterExpression(inputExpr, condExpr.transformUp(r)) - } - case SelectExpression(child, selectList) if child.resolved => { - val r = new Resolver(Some(child), child.namedExpressions) - return new SelectExpression(child, selectList.map { - _.transformUp(r) - }) - } - case l@LoopExpression(inputExpr, loopExpr, t) if inputExpr.resolved => { - val r = new Resolver(Some(inputExpr), inputExpr.namedExpressions, true) - return new LoopExpression(inputExpr, loopExpr.transformUp(r), t) - } - case x => x - } -} - -/** - * - any FieldReferences that explicitly reference the input, can be converted to implicit references - * - any FieldReferences that explicitly reference a - */ -object FieldValidator extends PartialFunction[Expression, Expression] { - - def isDefinedAt(x: Expression) = true - - def isSrc(e: Expression) = e.isInstanceOf[ClassExpression] || e.isInstanceOf[TraitExpression] - - def validateQualifiedField(srcDataType: IDataType[_]): PartialFunction[Expression, Expression] = { - case FieldExpression(fNm, fInfo, Some(child)) - if (child.children == Nil && !child.isInstanceOf[BackReference] && child.dataType == srcDataType) => - FieldExpression(fNm, fInfo, None) - case fe@FieldExpression(fNm, fInfo, Some(child)) if isSrc(child) => - throw new ExpressionException(fe, s"srcType of field doesn't match input type") - case hasFieldUnaryExpression(fNm, child) if child.dataType == srcDataType => - hasFieldLeafExpression(fNm) - case hF@hasFieldUnaryExpression(fNm, child) if isSrc(child) => - throw new ExpressionException(hF, s"srcType of field doesn't match input type") - case isTraitUnaryExpression(fNm, child) if child.dataType == srcDataType => - isTraitLeafExpression(fNm) - case iT@isTraitUnaryExpression(fNm, child) if isSrc(child) => - throw new ExpressionException(iT, s"srcType of field doesn't match input type") - } - - def validateOnlyFieldReferencesInLoopExpressions(loopExpr: LoopExpression) - : PartialFunction[Expression, Unit] = { - case f: FieldExpression => () - case x => throw new ExpressionException(loopExpr, - s"Loop Expression can only contain field references; '${x.toString}' not supported.") - } - - def apply(e: Expression): Expression = e match { - case f@FilterExpression(inputExpr, condExpr) => { - val validatedCE = condExpr.transformUp(validateQualifiedField(inputExpr.dataType)) - if (validatedCE.fastEquals(condExpr)) { - f - } else { - new FilterExpression(inputExpr, validatedCE) - } - } - case SelectExpression(child, selectList) if child.resolved => { - val v = validateQualifiedField(child.dataType) - return new SelectExpression(child, selectList.map { - _.transformUp(v) - }) - } - case l@LoopExpression(inputExpr, loopExpr, t) => { - val validatedLE = loopExpr.transformUp(validateQualifiedField(inputExpr.dataType)) - val l1 = { - if (validatedLE.fastEquals(loopExpr)) l - else new LoopExpression(inputExpr, validatedLE, t) - } - l1.loopingExpression.traverseUp(validateOnlyFieldReferencesInLoopExpressions(l1)) - l1 - } - case x => x - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala ---------------------------------------------------------------------- diff --git a/repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala b/repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala deleted file mode 100755 index a7e4a6d..0000000 --- a/repository/src/main/scala/org/apache/hadoop/metadata/query/TypeUtils.scala +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.metadata.query - -import java.util -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.hadoop.metadata.MetadataException -import org.apache.hadoop.metadata.query.Expressions.{SelectExpression, PathExpression} -import org.apache.hadoop.metadata.typesystem.types.DataTypes.{ArrayType, PrimitiveType, TypeCategory} -import org.apache.hadoop.metadata.typesystem.types._ - -object TypeUtils { - val typSystem = TypeSystem.getInstance() - - def numericTypes : Seq[PrimitiveType[_]] = Seq(DataTypes.BYTE_TYPE, - DataTypes.SHORT_TYPE, - DataTypes.INT_TYPE, - DataTypes.FLOAT_TYPE, - DataTypes.LONG_TYPE, - DataTypes.DOUBLE_TYPE, - DataTypes.BIGINTEGER_TYPE, - DataTypes.BIGDECIMAL_TYPE) - - def combinedType(typ1 : IDataType[_], typ2 : IDataType[_]) : PrimitiveType[_] = { - val typ1Idx = if (numericTypes.contains(typ1)) Some(numericTypes.indexOf(typ1)) else None - val typ2Idx = if (numericTypes.contains(typ2)) Some(numericTypes.indexOf(typ2)) else None - - if ( typ1Idx.isDefined && typ2Idx.isDefined ) { - val rIdx = math.max(typ1Idx.get, typ2Idx.get) - - if ( (typ1 == DataTypes.FLOAT_TYPE && typ2 == DataTypes.LONG_TYPE) || - (typ1 == DataTypes.LONG_TYPE && typ2 == DataTypes.FLOAT_TYPE) ) { - return DataTypes.DOUBLE_TYPE - } - return numericTypes(rIdx) - } - - throw new MetadataException(s"Cannot combine types: ${typ1.getName} and ${typ2.getName}") - } - - var tempStructCounter : AtomicInteger = new AtomicInteger(0) - val TEMP_STRUCT_NAME_PREFIX = "__tempQueryResultStruct" - def createStructType(selectExprs : List[Expressions.AliasExpression]) : StructType = { - val aDefs = new Array[AttributeDefinition](selectExprs.size) - selectExprs.zipWithIndex.foreach { t => - val (e,i) = t - aDefs(i) = new AttributeDefinition(e.alias,e.dataType.getName, Multiplicity.OPTIONAL, false, null) - } - return typSystem.defineQueryResultType(s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}", - null, - aDefs:_*); - } - - object ResultWithPathStruct { - val pathAttrName = "path" - val resultAttrName = "result" - val pathAttrType = DataTypes.arrayTypeName(typSystem.getIdType.getStructType) - - val pathAttr = new AttributeDefinition(pathAttrName, pathAttrType, Multiplicity.COLLECTION, false, null) - - def createType(pE : PathExpression, resultType : IDataType[_]) : StructType = { - val resultAttr = new AttributeDefinition(resultAttrName, resultType.getName, Multiplicity.REQUIRED, false, null) - val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}" - val m : java.util.HashMap[String, IDataType[_]] = new util.HashMap[String, IDataType[_]]() - if ( pE.child.isInstanceOf[SelectExpression]) { - m.put(pE.child.dataType.getName, pE.child.dataType) - } - typSystem.defineQueryResultType(typName, m, pathAttr, resultAttr); - } - } - - /** - * Structure representing the Closure Graph. - * Returns: - * 1. A map of vertexId -> vertex Info(these are the attributes requested in the query) - * 2. A edges map: each entry is a mapping from an vertexId to the List of adjacent vertexIds. - * - * '''The Vertex Map doesn't contain all the vertices in the Graph. Only the ones for which Attributes are - * available.''' These are the vertices that represent the EntityType whose Closure was requested. For e.g. for - * Table Lineage the ''vertex map'' will contain information about Tables, but not about ''Load Process'' vertices - * that connect Tables. - */ - object GraphResultStruct { - val SRC_PREFIX = "src" - val DEST_PREFIX = "dest" - - val verticesAttrName = "vertices" - val edgesAttrName = "edges" - val vertexIdAttrName = "vertexId" - - lazy val edgesAttrType = typSystem.defineMapType(DataTypes.STRING_TYPE, - typSystem.defineArrayType(DataTypes.STRING_TYPE)) - - def createType(resultWithPathType: StructType): StructType = { - val resultType = resultWithPathType.fieldMapping().fields.get(ResultWithPathStruct.resultAttrName).dataType() - - val verticesAttrType = typSystem.defineMapType(DataTypes.STRING_TYPE, - vertexType(resultType.asInstanceOf[StructType])) - val typName = s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}" - val verticesAttr = new AttributeDefinition(verticesAttrName, verticesAttrType.getName, - Multiplicity.REQUIRED, false, null) - val edgesAttr = new AttributeDefinition(edgesAttrName, edgesAttrType.getName, Multiplicity.REQUIRED, false, null) - - val m: java.util.HashMap[String, IDataType[_]] = new util.HashMap[String, IDataType[_]]() - m.put(resultWithPathType.getName, resultWithPathType) - m.put(resultType.getName, resultType) - m.put(edgesAttrType.getName, edgesAttrType) - m.put(verticesAttrType.getName, verticesAttrType) - typSystem.defineQueryResultType(typName, m, verticesAttr, edgesAttr) - } - - private def vertexType(resultType: StructType): StructType = { - - import scala.collection.JavaConverters._ - - var attrs: List[AttributeDefinition] = - resultType.fieldMapping.fields.asScala.filter(_._1.startsWith(s"${SRC_PREFIX}_")).mapValues { aInfo => - - new AttributeDefinition(aInfo.name.substring(s"${SRC_PREFIX}_".length), aInfo.dataType.getName, - aInfo.multiplicity, aInfo.isComposite, aInfo.reverseAttributeName) - }.values.toList - - attrs = new AttributeDefinition(vertexIdAttrName, typSystem.getIdType.getStructType.name, - Multiplicity.REQUIRED, false, null) :: attrs - - return typSystem.defineQueryResultType(s"${TEMP_STRUCT_NAME_PREFIX}${tempStructCounter.getAndIncrement}", - null, - attrs: _*) - } - } - - def fieldMapping(iDataType: IDataType[_]) : Option[FieldMapping] = iDataType match { - case c : ClassType => Some(c.fieldMapping()) - case t : TraitType => Some(t.fieldMapping()) - case s : StructType => Some(s.fieldMapping()) - case _ => None - } - - def hasFields(iDataType: IDataType[_]) : Boolean = { - fieldMapping(iDataType).isDefined - } - - import scala.language.existentials - case class FieldInfo(dataType : IDataType[_], - attrInfo : AttributeInfo, - reverseDataType : IDataType[_] = null, - traitName : String = null) { - def isReverse = reverseDataType != null - override def toString : String = { - if ( traitName != null ) { - s"""FieldInfo("${dataType.getName}", "$traitName")""" - } - else if ( reverseDataType == null ) { - s"""FieldInfo("${dataType.getName}", "${attrInfo.name}")""" - } else { - s"""FieldInfo("${dataType.getName}", "${attrInfo.name}", "${reverseDataType.getName}")""" - } - } - } - - val FIELD_QUALIFIER = "(.*?)(->.*)?".r - - /** - * Given a ComposedType `t` and a name resolve using the following rules: - * - if `id` is a field in `t` resolve to the field - * - if `id` is the name of a Struct|Class|Trait Type and it has a field that is of type `t` then return that type - * - * For e.g. - * 1. if we have types Table(name : String, cols : List[Column]), Column(name : String) then - * `resolveReference(Table, "cols")` resolves to type Column. So a query can be "Table.cols" - * 2. But if we have Table(name : String), Column(name : String, tbl : Table) then "Table.Column" will resolve - * to type Column - * - * This way the language will support navigation even if the relationship is one-sided. - * - * @param typ - * @param id - * @return - */ - def resolveReference(typ : IDataType[_], id : String) : Option[FieldInfo] = { - - val fMap = fieldMapping(typ) - if ( fMap.isDefined) { - - if (fMap.get.fields.containsKey(id)) { - return Some(FieldInfo(typ,fMap.get.fields.get(id))) - } - - try { - val FIELD_QUALIFIER(clsNm, rest) = id - val idTyp = typSystem.getDataType(classOf[IDataType[_]], clsNm) - val idTypFMap = fieldMapping(idTyp) - - if (rest != null ) { - val attrNm = rest.substring(2) - - if (idTypFMap.get.fields.containsKey(attrNm)) { - return Some(FieldInfo(typ,idTypFMap.get.fields.get(attrNm), idTyp)) - } - } - - if (idTypFMap.isDefined) { - import scala.collection.JavaConversions._ - val fields: Seq[AttributeInfo] = idTypFMap.get.fields.values().filter { aInfo => - aInfo.dataType() == typ || - ( aInfo.dataType().getTypeCategory == TypeCategory.ARRAY && - aInfo.dataType().asInstanceOf[ArrayType].getElemType == typ - ) - }.toSeq - if (fields.size == 1) { - return Some(FieldInfo(typ, fields(0), idTyp)) - } - /* - * is there only 1 array field of this type? - * If yes resolve to it. - * @todo: allow user to specify the relationship to follow by further qualifying the type. for e.g. - * field("LoadProcess.inputTables") - */ - val aFields = fields.filter { aInfo => aInfo.dataType().getTypeCategory == TypeCategory.ARRAY} - if (aFields.size == 1) { - return Some(FieldInfo(typ, aFields(0), idTyp)) - } - } - } catch { - case _ : MetadataException => None - } - } - None - } - - def resolveAsClassType(id : String) : Option[ClassType] = { - try { - Some(typSystem.getDataType(classOf[ClassType], id)) - } catch { - case _ : MetadataException => None - } - } - - def resolveAsTraitType(id : String) : Option[TraitType] = { - try { - Some(typSystem.getDataType(classOf[TraitType], id)) - } catch { - case _ : MetadataException => None - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/test/java/org/apache/atlas/RepositoryServiceLoadingTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/RepositoryServiceLoadingTest.java b/repository/src/test/java/org/apache/atlas/RepositoryServiceLoadingTest.java new file mode 100755 index 0000000..4195955 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/RepositoryServiceLoadingTest.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import com.thinkaurelius.titan.core.TitanGraph; +import org.apache.atlas.repository.graph.GraphProvider; +import org.testng.Assert; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; + +/** + * Unit test for Guice injector service loading + * + * Uses TestNG's Guice annotation to load the necessary modules and inject the + * objects from Guice + */ +@Guice(modules = RepositoryMetadataModule.class) +public class RepositoryServiceLoadingTest { + + @Inject + private GraphProvider<TitanGraph> graphProvider; + + @Test + public void testGetGraphService() throws Exception { + Assert.assertNotNull(graphProvider); + Assert.assertNotNull(graphProvider.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/test/java/org/apache/atlas/TestUtils.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/TestUtils.java b/repository/src/test/java/org/apache/atlas/TestUtils.java new file mode 100755 index 0000000..c7f35fc --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/TestUtils.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas; + +import com.google.common.collect.ImmutableList; +import com.thinkaurelius.titan.core.TitanGraph; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; +import com.tinkerpop.blueprints.util.io.graphson.GraphSONWriter; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.types.AttributeDefinition; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.EnumType; +import org.apache.atlas.typesystem.types.EnumTypeDefinition; +import org.apache.atlas.typesystem.types.EnumValue; +import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.StructTypeDefinition; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.testng.Assert; + +import java.io.File; + +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createStructTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createTraitTypeDef; + +/** + * Test utility class. + */ +public final class TestUtils { + + private TestUtils() { + } + + /** + * Dumps the graph in GSON format in the path returned. + * + * @param titanGraph handle to graph + * @return path to the dump file + * @throws Exception + */ + public static String dumpGraph(TitanGraph titanGraph) throws Exception { + File tempFile = File.createTempFile("graph", ".gson"); + System.out.println("tempFile.getPath() = " + tempFile.getPath()); + GraphSONWriter.outputGraph(titanGraph, tempFile.getPath()); + + System.out.println("Vertices:"); + for (Vertex vertex : titanGraph.getVertices()) { + System.out.println(GraphHelper.vertexString(vertex)); + } + + System.out.println("Edges:"); + for (Edge edge : titanGraph.getEdges()) { + System.out.println(GraphHelper.edgeString(edge)); + } + + return tempFile.getPath(); + } + + /** + * Class Hierarchy is: + * Department(name : String, employees : Array[Person]) + * Person(name : String, department : Department, manager : Manager) + * Manager(subordinates : Array[Person]) extends Person + * <p/> + * Persons can have SecurityClearance(level : Int) clearance. + */ + public static void defineDeptEmployeeTypes(TypeSystem ts) throws MetadataException { + + EnumTypeDefinition orgLevelEnum = + new EnumTypeDefinition("OrgLevel", new EnumValue("L1", 1), new EnumValue("L2", 2)); + ts.defineEnumType(orgLevelEnum); + + StructTypeDefinition addressDetails = createStructTypeDef("Address", + createRequiredAttrDef("street", DataTypes.STRING_TYPE), + createRequiredAttrDef("city", DataTypes.STRING_TYPE)); + + HierarchicalTypeDefinition<ClassType> deptTypeDef = + createClassTypeDef("Department", ImmutableList.<String>of(), + createRequiredAttrDef("name", DataTypes.STRING_TYPE), + new AttributeDefinition("employees", + String.format("array<%s>", "Person"), Multiplicity.COLLECTION, true, + "department") + ); + + HierarchicalTypeDefinition<ClassType> personTypeDef = createClassTypeDef("Person", + ImmutableList.<String>of(), + createRequiredAttrDef("name", DataTypes.STRING_TYPE), + createOptionalAttrDef("orgLevel", ts.getDataType(EnumType.class, "OrgLevel")), + createOptionalAttrDef("address", "Address"), + new AttributeDefinition("department", + "Department", Multiplicity.REQUIRED, false, "employees"), + new AttributeDefinition("manager", + "Manager", Multiplicity.OPTIONAL, false, "subordinates") + ); + + HierarchicalTypeDefinition<ClassType> managerTypeDef = createClassTypeDef("Manager", + ImmutableList.of("Person"), + new AttributeDefinition("subordinates", + String.format("array<%s>", "Person"), Multiplicity.COLLECTION, false, + "manager") + ); + + HierarchicalTypeDefinition<TraitType> securityClearanceTypeDef = createTraitTypeDef( + "SecurityClearance", + ImmutableList.<String>of(), + createRequiredAttrDef("level", DataTypes.INT_TYPE) + ); + + ts.defineTypes(ImmutableList.of(addressDetails), + ImmutableList.of(securityClearanceTypeDef), + ImmutableList.of(deptTypeDef, personTypeDef, managerTypeDef)); + } + + public static Referenceable createDeptEg1(TypeSystem ts) throws MetadataException { + Referenceable hrDept = new Referenceable("Department"); + Referenceable john = new Referenceable("Person"); + Referenceable jane = new Referenceable("Manager", "SecurityClearance"); + Referenceable johnAddr = new Referenceable("Address"); + Referenceable janeAddr = new Referenceable("Address"); + + hrDept.set("name", "hr"); + john.set("name", "John"); + john.set("department", hrDept); + johnAddr.set("street", "Stewart Drive"); + johnAddr.set("city", "Sunnyvale"); + john.set("address", johnAddr); + + jane.set("name", "Jane"); + jane.set("department", hrDept); + janeAddr.set("street", "Great America Parkway"); + janeAddr.set("city", "Santa Clara"); + jane.set("address", janeAddr); + + john.set("manager", jane); + + hrDept.set("employees", ImmutableList.of(john, jane)); + + jane.set("subordinates", ImmutableList.of(john)); + + jane.getTrait("SecurityClearance").set("level", 1); + + ClassType deptType = ts.getDataType(ClassType.class, "Department"); + ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED); + Assert.assertNotNull(hrDept2); + + return hrDept; + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java new file mode 100755 index 0000000..4d12ccd --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/discovery/GraphBackedDiscoveryServiceTest.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + +import com.google.common.collect.ImmutableList; +import com.thinkaurelius.titan.core.TitanGraph; +import com.tinkerpop.blueprints.Edge; +import com.tinkerpop.blueprints.Vertex; +import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.TestUtils; +import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; +import org.apache.atlas.query.HiveTitanSample; +import org.apache.atlas.query.QueryTestsUtils; +import org.apache.atlas.repository.graph.GraphBackedMetadataRepository; +import org.apache.atlas.repository.graph.GraphHelper; +import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.commons.io.FileUtils; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import javax.script.Bindings; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import java.io.File; + +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createClassTypeDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createOptionalAttrDef; +import static org.apache.atlas.typesystem.types.utils.TypesUtil.createRequiredAttrDef; + +@Guice(modules = RepositoryMetadataModule.class) +public class GraphBackedDiscoveryServiceTest { + + @Inject + private GraphProvider<TitanGraph> graphProvider; + + @Inject + private GraphBackedMetadataRepository repositoryService; + + @Inject + private GraphBackedDiscoveryService discoveryService; + + @BeforeClass + public void setUp() throws Exception { + TypeSystem typeSystem = TypeSystem.getInstance(); + typeSystem.reset(); + + QueryTestsUtils.setupTypes(); + setupSampleData(); + + TestUtils.defineDeptEmployeeTypes(typeSystem); + + Referenceable hrDept = TestUtils.createDeptEg1(typeSystem); + ClassType deptType = typeSystem.getDataType(ClassType.class, "Department"); + ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED); + + repositoryService.createEntity(hrDept2); + } + + private void setupSampleData() throws ScriptException { + TitanGraph titanGraph = graphProvider.get(); + + ScriptEngineManager manager = new ScriptEngineManager(); + ScriptEngine engine = manager.getEngineByName("gremlin-groovy"); + Bindings bindings = engine.createBindings(); + bindings.put("g", titanGraph); + + String hiveGraphFile = FileUtils.getTempDirectory().getPath() + + File.separator + System.nanoTime() + ".gson"; + System.out.println("hiveGraphFile = " + hiveGraphFile); + HiveTitanSample.writeGson(hiveGraphFile); + bindings.put("hiveGraphFile", hiveGraphFile); + + engine.eval("g.loadGraphSON(hiveGraphFile)", bindings); + titanGraph.commit(); + + System.out.println("*******************Graph Dump****************************"); + for (Vertex vertex : titanGraph.getVertices()) { + System.out.println(GraphHelper.vertexString(vertex)); + } + + for (Edge edge : titanGraph.getEdges()) { + System.out.println(GraphHelper.edgeString(edge)); + } + System.out.println("*******************Graph Dump****************************"); + } + + @AfterClass + public void tearDown() throws Exception { + TypeSystem.getInstance().reset(); + } + + @Test + public void testSearchByDSL() throws Exception { + String dslQuery = "from Department"; + + String jsonResults = discoveryService.searchByDSL(dslQuery); + Assert.assertNotNull(jsonResults); + + JSONObject results = new JSONObject(jsonResults); + Assert.assertEquals(results.length(), 3); + System.out.println("results = " + results); + + Object query = results.get("query"); + Assert.assertNotNull(query); + + JSONObject dataType = results.getJSONObject("dataType"); + Assert.assertNotNull(dataType); + String typeName = dataType.getString("typeName"); + Assert.assertNotNull(typeName); + Assert.assertEquals(typeName, "Department"); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertNotNull(rows); + Assert.assertEquals(rows.length(), 1); + } + + @Test(expectedExceptions = Throwable.class) + public void testSearchByDSLBadQuery() throws Exception { + String dslQuery = "from blah"; + + discoveryService.searchByDSL(dslQuery); + Assert.fail(); + } + + @Test + public void testRawSearch1() throws Exception { + // Query for all Vertices in Graph + Object r = discoveryService.searchByGremlin("g.V.toList()"); + System.out.println("search result = " + r); + + // Query for all Vertices of a Type + r = discoveryService.searchByGremlin("g.V.filter{it.typeName == 'Department'}.toList()"); + System.out.println("search result = " + r); + + // Property Query: list all Person names + r = discoveryService + .searchByGremlin("g.V.filter{it.typeName == 'Person'}.'Person.name'.toList()"); + System.out.println("search result = " + r); + } + + @DataProvider(name = "dslQueriesProvider") + private Object[][] createDSLQueries() { + return new String[][] { + {"from DB"}, + {"DB"}, + {"DB where DB.name=\"Reporting\""}, + {"DB DB.name = \"Reporting\""}, + {"DB where DB.name=\"Reporting\" select name, owner"}, + {"DB has name"}, + {"DB, Table"}, + {"DB is JdbcAccess"}, + /* + {"DB, LoadProcess has name"}, + {"DB as db1, Table where db1.name = \"Reporting\""}, + {"DB where DB.name=\"Reporting\" and DB.createTime < " + System.currentTimeMillis()}, + */ + {"from Table"}, + {"Table"}, + {"Table is Dimension"}, + {"Column where Column isa PII"}, + {"View is Dimension"}, + /*{"Column where Column isa PII select Column.name"},*/ + {"Column select Column.name"}, + {"Column select name"}, + {"Column where Column.name=\"customer_id\""}, + {"from Table select Table.name"}, + {"DB where (name = \"Reporting\")"}, + {"DB where (name = \"Reporting\") select name as _col_0, owner as _col_1"}, + {"DB where DB is JdbcAccess"}, + {"DB where DB has name"}, + {"DB Table"}, + {"DB where DB has name"}, + {"DB as db1 Table where (db1.name = \"Reporting\")"}, + {"DB where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 "}, + /* + todo: does not work + {"DB where (name = \"Reporting\") and ((createTime + 1) > 0)"}, + {"DB as db1 Table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") select db1.name as dbName, tab.name as tabName"}, + {"DB as db1 Table as tab where ((db1.createTime + 1) > 0) or (db1.name = \"Reporting\") select db1.name as dbName, tab.name as tabName"}, + {"DB as db1 Table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"}, + {"DB as db1 Table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"}, + */ + // trait searches + {"Dimension"}, + /*{"Fact"}, - todo: does not work*/ + {"JdbcAccess"}, + {"ETL"}, + {"Metric"}, + {"PII"}, + // Lineage + {"Table LoadProcess outputTable"}, + {"Table loop (LoadProcess outputTable)"}, + {"Table as _loop0 loop (LoadProcess outputTable) withPath"}, + {"Table as src loop (LoadProcess outputTable) as dest select src.name as srcTable, dest.name as destTable withPath"}, + {"Table as t, sd, Column as c where t.name=\"sales_fact\" select c.name as colName, c.dataType as colType"}, + {"Table where name='sales_fact', db where name='Reporting'"} + }; + } + + @Test (dataProvider = "dslQueriesProvider") + public void testSearchByDSLQueries(String dslQuery) throws Exception { + System.out.println("Executing dslQuery = " + dslQuery); + String jsonResults = discoveryService.searchByDSL(dslQuery); + Assert.assertNotNull(jsonResults); + + JSONObject results = new JSONObject(jsonResults); + Assert.assertEquals(results.length(), 3); + System.out.println("results = " + results); + + Object query = results.get("query"); + Assert.assertNotNull(query); + + JSONObject dataType = results.getJSONObject("dataType"); + Assert.assertNotNull(dataType); + String typeName = dataType.getString("typeName"); + Assert.assertNotNull(typeName); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertNotNull(rows); + Assert.assertTrue(rows.length() >= 0); // some queries may not have any results + System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows"); + } + + @DataProvider(name = "invalidDslQueriesProvider") + private Object[][] createInvalidDSLQueries() { + return new String[][] { + {"from Unknown"}, + {"Unknown"}, + {"Unknown is Blah"}, + }; + } + + @Test (dataProvider = "invalidDslQueriesProvider", expectedExceptions = DiscoveryException.class) + public void testSearchByDSLInvalidQueries(String dslQuery) throws Exception { + System.out.println("Executing dslQuery = " + dslQuery); + discoveryService.searchByDSL(dslQuery); + Assert.fail(); + } + + @Test + public void testSearchForTypeInheritance() throws Exception { + createTypesWithMultiLevelInheritance(); + createInstances(); + + String dslQuery = "from D where a = 1"; + String jsonResults = discoveryService.searchByDSL(dslQuery); + Assert.assertNotNull(jsonResults); + + JSONObject results = new JSONObject(jsonResults); + System.out.println("results = " + results); + } + + /* + * Type Hierarchy is: + * A(a) + * B(b) extends A + * C(c) extends B + * D(d) extends C + */ + private void createTypesWithMultiLevelInheritance() throws Exception { + HierarchicalTypeDefinition A = createClassTypeDef("A", null, + createRequiredAttrDef("a", DataTypes.INT_TYPE)); + + HierarchicalTypeDefinition B = createClassTypeDef("B", ImmutableList.of("A"), + createOptionalAttrDef("b", DataTypes.BOOLEAN_TYPE)); + + HierarchicalTypeDefinition C = createClassTypeDef("C", ImmutableList.of("B"), + createOptionalAttrDef("c", DataTypes.BYTE_TYPE)); + + HierarchicalTypeDefinition D = createClassTypeDef("D", ImmutableList.of("C"), + createOptionalAttrDef("d", DataTypes.SHORT_TYPE)); + + TypeSystem.getInstance().defineClassTypes(A, B, C, D); + } + + private void createInstances() throws Exception { + Referenceable instance = new Referenceable("D"); + instance.set("d", 1); + instance.set("c", 1); + instance.set("b", true); + instance.set("a", 1); + + ClassType deptType = TypeSystem.getInstance().getDataType(ClassType.class, "D"); + ITypedReferenceableInstance typedInstance = + deptType.convert(instance, Multiplicity.REQUIRED); + + repositoryService.createEntity(typedInstance); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java b/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java new file mode 100644 index 0000000..9141d85 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/discovery/HiveLineageServiceTest.java @@ -0,0 +1,593 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.discovery; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.atlas.RepositoryMetadataModule; +import org.apache.atlas.discovery.graph.GraphBackedDiscoveryService; +import org.apache.atlas.repository.EntityNotFoundException; +import org.apache.atlas.services.DefaultMetadataService; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.TypesDef; +import org.apache.atlas.typesystem.json.InstanceSerialization; +import org.apache.atlas.typesystem.json.TypesSerialization; +import org.apache.atlas.typesystem.persistence.Id; +import org.apache.atlas.typesystem.types.AttributeDefinition; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.EnumTypeDefinition; +import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; +import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.StructTypeDefinition; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.TypeUtils; +import org.apache.atlas.typesystem.types.utils.TypesUtil; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import javax.inject.Inject; +import java.util.List; + +/** + * Unit tests for Hive LineageService. + */ +@Guice(modules = RepositoryMetadataModule.class) +public class HiveLineageServiceTest { + + @Inject + private DefaultMetadataService metadataService; + + @Inject + private GraphBackedDiscoveryService discoveryService; + + @Inject + private HiveLineageService hiveLineageService; + +// @Inject +// private GraphProvider<TitanGraph> graphProvider; + + @BeforeClass + public void setUp() throws Exception { + setUpTypes(); + setupInstances(); + + // TestUtils.dumpGraph(graphProvider.get()); + } + + @DataProvider(name = "dslQueriesProvider") + private Object[][] createDSLQueries() { + return new String[][] { + // joins + {"hive_table where name=\"sales_fact\", columns"}, + {"hive_table where name=\"sales_fact\", columns select name, dataType, comment"}, + {"hive_table where name=\"sales_fact\", columns as c select c.name, c.dataType, c.comment"}, +// {"hive_db as db where (db.name=\"Reporting\"), hive_table as table select db.name, table.name"}, + {"from hive_db"}, + {"hive_db"}, + {"hive_db where hive_db.name=\"Reporting\""}, + {"hive_db hive_db.name = \"Reporting\""}, + {"hive_db where hive_db.name=\"Reporting\" select name, owner"}, + {"hive_db has name"}, +// {"hive_db, hive_table"}, +// {"hive_db, hive_process has name"}, +// {"hive_db as db1, hive_table where db1.name = \"Reporting\""}, +// {"hive_db where hive_db.name=\"Reporting\" and hive_db.createTime < " + System.currentTimeMillis()}, + {"from hive_table"}, + {"hive_table"}, + {"hive_table is Dimension"}, + {"hive_column where hive_column isa PII"}, +// {"hive_column where hive_column isa PII select hive_column.name"}, + {"hive_column select hive_column.name"}, + {"hive_column select name"}, + {"hive_column where hive_column.name=\"customer_id\""}, + {"from hive_table select hive_table.name"}, + {"hive_db where (name = \"Reporting\")"}, + {"hive_db where (name = \"Reporting\") select name as _col_0, owner as _col_1"}, + {"hive_db where hive_db has name"}, +// {"hive_db hive_table"}, + {"hive_db where hive_db has name"}, +// {"hive_db as db1 hive_table where (db1.name = \"Reporting\")"}, + {"hive_db where (name = \"Reporting\") select name as _col_0, (createTime + 1) as _col_1 "}, +// {"hive_db where (name = \"Reporting\") and ((createTime + 1) > 0)"}, +// {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") select db1.name as dbName, tab.name as tabName"}, +// {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) or (db1.name = \"Reporting\") select db1.name as dbName, tab.name as tabName"}, +// {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"}, +// {"hive_db as db1 hive_table as tab where ((db1.createTime + 1) > 0) and (db1.name = \"Reporting\") or db1 has owner select db1.name as dbName, tab.name as tabName"}, + // trait searches + {"Dimension"}, + {"Fact"}, + {"ETL"}, + {"Metric"}, + {"PII"}, + }; + } + + @Test (dataProvider = "dslQueriesProvider") + public void testSearchByDSLQueries(String dslQuery) throws Exception { + System.out.println("Executing dslQuery = " + dslQuery); + String jsonResults = discoveryService.searchByDSL(dslQuery); + Assert.assertNotNull(jsonResults); + + JSONObject results = new JSONObject(jsonResults); + Assert.assertEquals(results.length(), 3); + System.out.println("results = " + results); + + Object query = results.get("query"); + Assert.assertNotNull(query); + + JSONObject dataType = results.getJSONObject("dataType"); + Assert.assertNotNull(dataType); + String typeName = dataType.getString("typeName"); + Assert.assertNotNull(typeName); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertNotNull(rows); + Assert.assertTrue(rows.length() >= 0); // some queries may not have any results + System.out.println("query [" + dslQuery + "] returned [" + rows.length() + "] rows"); + } + + @Test + public void testGetInputs() throws Exception { + JSONObject results = new JSONObject(hiveLineageService.getInputs("sales_fact_monthly_mv")); + Assert.assertNotNull(results); + System.out.println("inputs = " + results); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertTrue(rows.length() > 0); + + final JSONObject row = rows.getJSONObject(0); + JSONArray paths = row.getJSONArray("path"); + Assert.assertTrue(paths.length() > 0); + } + + @Test (expectedExceptions = IllegalArgumentException.class) + public void testGetInputsTableNameNull() throws Exception { + hiveLineageService.getInputs(null); + Assert.fail(); + } + + @Test (expectedExceptions = IllegalArgumentException.class) + public void testGetInputsTableNameEmpty() throws Exception { + hiveLineageService.getInputs(""); + Assert.fail(); + } + + @Test (expectedExceptions = EntityNotFoundException.class) + public void testGetInputsBadTableName() throws Exception { + hiveLineageService.getInputs("blah"); + Assert.fail(); + } + + @Test + public void testGetInputsGraph() throws Exception { + JSONObject results = new JSONObject( + hiveLineageService.getInputsGraph("sales_fact_monthly_mv")); + Assert.assertNotNull(results); + System.out.println("inputs graph = " + results); + + JSONObject values = results.getJSONObject("values"); + Assert.assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 4); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @Test + public void testGetOutputs() throws Exception { + JSONObject results = new JSONObject(hiveLineageService.getOutputs("sales_fact")); + Assert.assertNotNull(results); + System.out.println("outputs = " + results); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertTrue(rows.length() > 0); + + final JSONObject row = rows.getJSONObject(0); + JSONArray paths = row.getJSONArray("path"); + Assert.assertTrue(paths.length() > 0); + } + + @Test (expectedExceptions = IllegalArgumentException.class) + public void testGetOututsTableNameNull() throws Exception { + hiveLineageService.getOutputs(null); + Assert.fail(); + } + + @Test (expectedExceptions = IllegalArgumentException.class) + public void testGetOutputsTableNameEmpty() throws Exception { + hiveLineageService.getOutputs(""); + Assert.fail(); + } + + @Test (expectedExceptions = EntityNotFoundException.class) + public void testGetOutputsBadTableName() throws Exception { + hiveLineageService.getOutputs("blah"); + Assert.fail(); + } + + @Test + public void testGetOutputsGraph() throws Exception { + JSONObject results = new JSONObject(hiveLineageService.getOutputsGraph("sales_fact")); + Assert.assertNotNull(results); + System.out.println("outputs graph = " + results); + + JSONObject values = results.getJSONObject("values"); + Assert.assertNotNull(values); + + final JSONObject vertices = values.getJSONObject("vertices"); + Assert.assertEquals(vertices.length(), 3); + + final JSONObject edges = values.getJSONObject("edges"); + Assert.assertEquals(edges.length(), 4); + } + + @DataProvider(name = "tableNamesProvider") + private Object[][] tableNames() { + return new String[][] { + {"sales_fact", "4"}, + {"time_dim", "3"}, + {"sales_fact_daily_mv", "4"}, + {"sales_fact_monthly_mv", "4"} + }; + } + + @Test (dataProvider = "tableNamesProvider") + public void testGetSchema(String tableName, String expected) throws Exception { + JSONObject results = new JSONObject(hiveLineageService.getSchema(tableName)); + Assert.assertNotNull(results); + System.out.println("columns = " + results); + + JSONArray rows = results.getJSONArray("rows"); + Assert.assertEquals(rows.length(), Integer.parseInt(expected)); + + for (int index = 0; index < rows.length(); index++) { + final JSONObject row = rows.getJSONObject(index); + Assert.assertNotNull(row.getString("name")); + Assert.assertNotNull(row.getString("comment")); + Assert.assertNotNull(row.getString("dataType")); + Assert.assertEquals(row.getString("$typeName$"), "hive_column"); + } + } + + @Test (expectedExceptions = IllegalArgumentException.class) + public void testGetSchemaTableNameNull() throws Exception { + hiveLineageService.getSchema(null); + Assert.fail(); + } + + @Test (expectedExceptions = IllegalArgumentException.class) + public void testGetSchemaTableNameEmpty() throws Exception { + hiveLineageService.getSchema(""); + Assert.fail(); + } + + @Test (expectedExceptions = EntityNotFoundException.class) + public void testGetSchemaBadTableName() throws Exception { + hiveLineageService.getSchema("blah"); + Assert.fail(); + } + + private void setUpTypes() throws Exception { + TypesDef typesDef = createTypeDefinitions(); + String typesAsJSON = TypesSerialization.toJson(typesDef); + metadataService.createType(typesAsJSON); + } + + private static final String DATABASE_TYPE = "hive_db"; + private static final String HIVE_TABLE_TYPE = "hive_table"; + private static final String COLUMN_TYPE = "hive_column"; + private static final String HIVE_PROCESS_TYPE = "hive_process"; + private static final String STORAGE_DESC_TYPE = "StorageDesc"; + private static final String VIEW_TYPE = "View"; + + private TypesDef createTypeDefinitions() { + HierarchicalTypeDefinition<ClassType> dbClsDef + = TypesUtil.createClassTypeDef(DATABASE_TYPE, null, + attrDef("name", DataTypes.STRING_TYPE), + attrDef("description", DataTypes.STRING_TYPE), + attrDef("locationUri", DataTypes.STRING_TYPE), + attrDef("owner", DataTypes.STRING_TYPE), + attrDef("createTime", DataTypes.LONG_TYPE) + ); + + HierarchicalTypeDefinition<ClassType> storageDescClsDef = + TypesUtil.createClassTypeDef(STORAGE_DESC_TYPE, null, + attrDef("location", DataTypes.STRING_TYPE), + attrDef("inputFormat", DataTypes.STRING_TYPE), + attrDef("outputFormat", DataTypes.STRING_TYPE), + attrDef("compressed", DataTypes.STRING_TYPE, + Multiplicity.REQUIRED, false, null) + ); + + HierarchicalTypeDefinition<ClassType> columnClsDef = + TypesUtil.createClassTypeDef(COLUMN_TYPE, null, + attrDef("name", DataTypes.STRING_TYPE), + attrDef("dataType", DataTypes.STRING_TYPE), + attrDef("comment", DataTypes.STRING_TYPE) + ); + + HierarchicalTypeDefinition<ClassType> tblClsDef = + TypesUtil.createClassTypeDef(HIVE_TABLE_TYPE, ImmutableList.of("DataSet"), + attrDef("owner", DataTypes.STRING_TYPE), + attrDef("createTime", DataTypes.LONG_TYPE), + attrDef("lastAccessTime", DataTypes.LONG_TYPE), + attrDef("tableType", DataTypes.STRING_TYPE), + attrDef("temporary", DataTypes.BOOLEAN_TYPE), + new AttributeDefinition("db", DATABASE_TYPE, + Multiplicity.REQUIRED, false, null), + // todo - uncomment this, something is broken +// new AttributeDefinition("sd", STORAGE_DESC_TYPE, +// Multiplicity.REQUIRED, true, null), + new AttributeDefinition("columns", + DataTypes.arrayTypeName(COLUMN_TYPE), + Multiplicity.COLLECTION, true, null) + ); + + HierarchicalTypeDefinition<ClassType> loadProcessClsDef = + TypesUtil.createClassTypeDef(HIVE_PROCESS_TYPE, ImmutableList.of("Process"), + attrDef("userName", DataTypes.STRING_TYPE), + attrDef("startTime", DataTypes.LONG_TYPE), + attrDef("endTime", DataTypes.LONG_TYPE), + attrDef("queryText", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), + attrDef("queryPlan", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), + attrDef("queryId", DataTypes.STRING_TYPE, Multiplicity.REQUIRED), + attrDef("queryGraph", DataTypes.STRING_TYPE, Multiplicity.REQUIRED) + ); + + HierarchicalTypeDefinition<ClassType> viewClsDef = + TypesUtil.createClassTypeDef(VIEW_TYPE, null, + attrDef("name", DataTypes.STRING_TYPE), + new AttributeDefinition("db", DATABASE_TYPE, + Multiplicity.REQUIRED, false, null), + new AttributeDefinition("inputTables", + DataTypes.arrayTypeName(HIVE_TABLE_TYPE), + Multiplicity.COLLECTION, false, null) + ); + + HierarchicalTypeDefinition<TraitType> dimTraitDef = + TypesUtil.createTraitTypeDef("Dimension", null); + + HierarchicalTypeDefinition<TraitType> factTraitDef = + TypesUtil.createTraitTypeDef("Fact", null); + + HierarchicalTypeDefinition<TraitType> metricTraitDef = + TypesUtil.createTraitTypeDef("Metric", null); + + HierarchicalTypeDefinition<TraitType> etlTraitDef = + TypesUtil.createTraitTypeDef("ETL", null); + + HierarchicalTypeDefinition<TraitType> piiTraitDef = + TypesUtil.createTraitTypeDef("PII", null); + + HierarchicalTypeDefinition<TraitType> jdbcTraitDef = + TypesUtil.createTraitTypeDef("JdbcAccess", null); + + return TypeUtils.getTypesDef( + ImmutableList.<EnumTypeDefinition>of(), + ImmutableList.<StructTypeDefinition>of(), + ImmutableList.of(dimTraitDef, factTraitDef, + piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef), + ImmutableList.of(dbClsDef, storageDescClsDef, columnClsDef, + tblClsDef, loadProcessClsDef, viewClsDef) + ); + } + + AttributeDefinition attrDef(String name, IDataType dT) { + return attrDef(name, dT, Multiplicity.OPTIONAL, false, null); + } + + AttributeDefinition attrDef(String name, IDataType dT, Multiplicity m) { + return attrDef(name, dT, m, false, null); + } + + AttributeDefinition attrDef(String name, IDataType dT, + Multiplicity m, boolean isComposite, String reverseAttributeName) { + Preconditions.checkNotNull(name); + Preconditions.checkNotNull(dT); + return new AttributeDefinition(name, dT.getName(), m, isComposite, reverseAttributeName); + } + + private void setupInstances() throws Exception { + Id salesDB = database( + "Sales", "Sales Database", "John ETL", "hdfs://host:8000/apps/warehouse/sales"); + + Referenceable sd = storageDescriptor("hdfs://host:8000/apps/warehouse/sales", + "TextInputFormat", "TextOutputFormat", true); + + List<Referenceable> salesFactColumns = ImmutableList.of( + column("time_id", "int", "time id"), + column("product_id", "int", "product id"), + column("customer_id", "int", "customer id", "PII"), + column("sales", "double", "product id", "Metric") + ); + + Id salesFact = table("sales_fact", "sales fact table", + salesDB, sd, "Joe", "Managed", salesFactColumns, "Fact"); + + List<Referenceable> timeDimColumns = ImmutableList.of( + column("time_id", "int", "time id"), + column("dayOfYear", "int", "day Of Year"), + column("weekDay", "int", "week Day") + ); + + Id timeDim = table("time_dim", "time dimension table", + salesDB, sd, "John Doe", "External", timeDimColumns, "Dimension"); + + Id reportingDB = database("Reporting", "reporting database", "Jane BI", + "hdfs://host:8000/apps/warehouse/reporting"); + + Id salesFactDaily = table("sales_fact_daily_mv", + "sales fact daily materialized view", + reportingDB, sd, "Joe BI", "Managed", salesFactColumns, "Metric"); + + loadProcess("loadSalesDaily", "hive query for daily summary", "John ETL", + ImmutableList.of(salesFact, timeDim), ImmutableList.of(salesFactDaily), + "create table as select ", "plan", "id", "graph", + "ETL"); + + List<Referenceable> productDimColumns = ImmutableList.of( + column("product_id", "int", "product id"), + column("product_name", "string", "product name"), + column("brand_name", "int", "brand name") + ); + + Id productDim = table("product_dim", "product dimension table", + salesDB, sd, "John Doe", "Managed", productDimColumns, "Dimension"); + + view("product_dim_view", reportingDB, + ImmutableList.of(productDim), "Dimension", "JdbcAccess"); + + List<Referenceable> customerDimColumns = ImmutableList.of( + column("customer_id", "int", "customer id", "PII"), + column("name", "string", "customer name", "PII"), + column("address", "string", "customer address", "PII") + ); + + Id customerDim = table("customer_dim", "customer dimension table", + salesDB, sd, "fetl", "External", customerDimColumns, "Dimension"); + + view("customer_dim_view", reportingDB, + ImmutableList.of(customerDim), "Dimension", "JdbcAccess"); + + Id salesFactMonthly = table("sales_fact_monthly_mv", + "sales fact monthly materialized view", + reportingDB, sd, "Jane BI", "Managed", salesFactColumns, "Metric"); + + loadProcess("loadSalesMonthly", "hive query for monthly summary", "John ETL", + ImmutableList.of(salesFactDaily), ImmutableList.of(salesFactMonthly), + "create table as select ", "plan", "id", "graph", + "ETL"); + } + + Id database(String name, String description, + String owner, String locationUri, + String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("description", description); + referenceable.set("owner", owner); + referenceable.set("locationUri", locationUri); + referenceable.set("createTime", System.currentTimeMillis()); + + return createInstance(referenceable); + } + + Referenceable storageDescriptor(String location, String inputFormat, + String outputFormat, + boolean compressed) throws Exception { + Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE); + referenceable.set("location", location); + referenceable.set("inputFormat", inputFormat); + referenceable.set("outputFormat", outputFormat); + referenceable.set("compressed", compressed); + + return referenceable; + } + + Referenceable column(String name, String dataType, String comment, + String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("dataType", dataType); + referenceable.set("comment", comment); + + return referenceable; + } + + Id table(String name, String description, + Id dbId, Referenceable sd, + String owner, String tableType, + List<Referenceable> columns, + String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(HIVE_TABLE_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("description", description); + referenceable.set("owner", owner); + referenceable.set("tableType", tableType); + referenceable.set("temporary", false); + referenceable.set("createTime", System.currentTimeMillis()); + referenceable.set("lastAccessTime", System.currentTimeMillis()); + referenceable.set("retention", System.currentTimeMillis()); + + referenceable.set("db", dbId); + // todo - uncomment this, something is broken + // referenceable.set("sd", sd); + referenceable.set("columns", columns); + + return createInstance(referenceable); + } + + Id loadProcess(String name, String description, String user, + List<Id> inputTables, + List<Id> outputTables, + String queryText, String queryPlan, + String queryId, String queryGraph, + String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(HIVE_PROCESS_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("description", description); + referenceable.set("user", user); + referenceable.set("startTime", System.currentTimeMillis()); + referenceable.set("endTime", System.currentTimeMillis() + 10000); + + referenceable.set("inputs", inputTables); + referenceable.set("outputs", outputTables); + + referenceable.set("queryText", queryText); + referenceable.set("queryPlan", queryPlan); + referenceable.set("queryId", queryId); + referenceable.set("queryGraph", queryGraph); + + return createInstance(referenceable); + } + + Id view(String name, Id dbId, + List<Id> inputTables, + String... traitNames) throws Exception { + Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames); + referenceable.set("name", name); + referenceable.set("db", dbId); + + referenceable.set("inputTables", inputTables); + + return createInstance(referenceable); + } + + private Id createInstance(Referenceable referenceable) throws Exception { + String typeName = referenceable.getTypeName(); + System.out.println("creating instance of type " + typeName); + + String entityJSON = InstanceSerialization.toJson(referenceable, true); + System.out.println("Submitting new entity= " + entityJSON); + String guid = metadataService.createEntity(entityJSON); + System.out.println("created instance for type " + typeName + ", guid: " + guid); + + // return the reference to created instance with guid + return new Id(guid, 0, referenceable.getTypeName()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/30711973/repository/src/test/java/org/apache/atlas/repository/BaseTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/repository/BaseTest.java b/repository/src/test/java/org/apache/atlas/repository/BaseTest.java new file mode 100755 index 0000000..1b1f692 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/repository/BaseTest.java @@ -0,0 +1,206 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.atlas.repository; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.atlas.MetadataException; +import org.apache.atlas.repository.memory.MemRepository; +import org.apache.atlas.typesystem.ITypedReferenceableInstance; +import org.apache.atlas.typesystem.Referenceable; +import org.apache.atlas.typesystem.Struct; +import org.apache.atlas.typesystem.types.AttributeDefinition; +import org.apache.atlas.typesystem.types.ClassType; +import org.apache.atlas.typesystem.types.DataTypes; +import org.apache.atlas.typesystem.types.HierarchicalType; +import org.apache.atlas.typesystem.types.HierarchicalTypeDefinition; +import org.apache.atlas.typesystem.types.IDataType; +import org.apache.atlas.typesystem.types.Multiplicity; +import org.apache.atlas.typesystem.types.StructType; +import org.apache.atlas.typesystem.types.StructTypeDefinition; +import org.apache.atlas.typesystem.types.TraitType; +import org.apache.atlas.typesystem.types.TypeSystem; +import org.apache.atlas.typesystem.types.utils.TypesUtil; +import org.junit.Before; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Date; +import java.util.Map; + +public abstract class BaseTest { + + public static final String STRUCT_TYPE_1 = "t1"; + public static final String STRUCT_TYPE_2 = "t2"; + public static final String TEST_DATE = "2014-12-11T02:35:58.440Z"; + public static final long TEST_DATE_IN_LONG=1418265358440L; + protected IRepository repo; + + public static Struct createStruct() throws MetadataException { + StructType structType = (StructType) TypeSystem.getInstance() + .getDataType(StructType.class, STRUCT_TYPE_1); + Struct s = new Struct(structType.getName()); + s.set("a", 1); + s.set("b", true); + s.set("c", (byte) 1); + s.set("d", (short) 2); + s.set("e", 1); + s.set("f", 1); + s.set("g", 1L); + s.set("h", 1.0f); + s.set("i", 1.0); + s.set("j", BigInteger.valueOf(1L)); + s.set("k", new BigDecimal(1)); + s.set("l", new Date(1418265358440L)); + s.set("m", Lists.<Integer>asList(Integer.valueOf(1), new Integer[]{Integer.valueOf(1)})); + s.set("n", Lists.<BigDecimal>asList(BigDecimal.valueOf(1.1), + new BigDecimal[]{BigDecimal.valueOf(1.1)})); + Map<String, Double> hm = Maps.<String, Double>newHashMap(); + hm.put("a", 1.0); + hm.put("b", 2.0); + s.set("o", hm); + return s; + } + + protected final TypeSystem getTypeSystem() { + return TypeSystem.getInstance(); + } + + protected final IRepository getRepository() { + return repo; + } + + @Before + public void setup() throws Exception { + + TypeSystem ts = TypeSystem.getInstance(); + ts.reset(); + repo = new MemRepository(ts); + + StructType structType = ts.defineStructType(STRUCT_TYPE_1, + true, + TypesUtil.createRequiredAttrDef("a", DataTypes.INT_TYPE), + TypesUtil.createOptionalAttrDef("b", DataTypes.BOOLEAN_TYPE), + TypesUtil.createOptionalAttrDef("c", DataTypes.BYTE_TYPE), + TypesUtil.createOptionalAttrDef("d", DataTypes.SHORT_TYPE), + TypesUtil.createOptionalAttrDef("e", DataTypes.INT_TYPE), + TypesUtil.createOptionalAttrDef("f", DataTypes.INT_TYPE), + TypesUtil.createOptionalAttrDef("g", DataTypes.LONG_TYPE), + TypesUtil.createOptionalAttrDef("h", DataTypes.FLOAT_TYPE), + TypesUtil.createOptionalAttrDef("i", DataTypes.DOUBLE_TYPE), + TypesUtil.createOptionalAttrDef("j", DataTypes.BIGINTEGER_TYPE), + TypesUtil.createOptionalAttrDef("k", DataTypes.BIGDECIMAL_TYPE), + TypesUtil.createOptionalAttrDef("l", DataTypes.DATE_TYPE), + TypesUtil.createOptionalAttrDef("m", ts.defineArrayType(DataTypes.INT_TYPE)), + TypesUtil.createOptionalAttrDef("n", ts.defineArrayType(DataTypes.BIGDECIMAL_TYPE)), + TypesUtil.createOptionalAttrDef("o", + ts.defineMapType(DataTypes.STRING_TYPE, DataTypes.DOUBLE_TYPE))); + + StructType recursiveStructType = ts.defineStructType(STRUCT_TYPE_2, + true, + TypesUtil.createRequiredAttrDef("a", DataTypes.INT_TYPE), + TypesUtil.createOptionalAttrDef("s", STRUCT_TYPE_2)); + } + + protected Map<String, IDataType> defineTraits(HierarchicalTypeDefinition... tDefs) + throws MetadataException { + + return getTypeSystem().defineTraitTypes(tDefs); + } + + /* + * Class Hierarchy is: + * Department(name : String, employees : Array[Person]) + * Person(name : String, department : Department, manager : Manager) + * Manager(subordinates : Array[Person]) extends Person + * + * Persons can have SecurityClearance(level : Int) clearance. + */ + protected void defineDeptEmployeeTypes(TypeSystem ts) throws MetadataException { + + HierarchicalTypeDefinition<ClassType> deptTypeDef = + TypesUtil.createClassTypeDef("Department", ImmutableList.<String>of(), + TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE), + new AttributeDefinition("employees", String.format("array<%s>", "Person"), + Multiplicity.COLLECTION, true, "department") + ); + HierarchicalTypeDefinition<ClassType> personTypeDef = + TypesUtil.createClassTypeDef("Person", ImmutableList.<String>of(), + TypesUtil.createRequiredAttrDef("name", DataTypes.STRING_TYPE), + new AttributeDefinition("department", + "Department", Multiplicity.REQUIRED, false, "employees"), + new AttributeDefinition("manager", + "Manager", Multiplicity.OPTIONAL, false, "subordinates") + ); + HierarchicalTypeDefinition<ClassType> managerTypeDef = + TypesUtil.createClassTypeDef("Manager", + ImmutableList.<String>of("Person"), + new AttributeDefinition("subordinates", + String.format("array<%s>", "Person"), + Multiplicity.COLLECTION, false, "manager") + ); + + HierarchicalTypeDefinition<TraitType> securityClearanceTypeDef = + TypesUtil.createTraitTypeDef("SecurityClearance", + ImmutableList.<String>of(), + TypesUtil.createRequiredAttrDef("level", DataTypes.INT_TYPE) + ); + + ts.defineTypes(ImmutableList.<StructTypeDefinition>of(), + ImmutableList.<HierarchicalTypeDefinition<TraitType>>of(securityClearanceTypeDef), + ImmutableList.<HierarchicalTypeDefinition<ClassType>>of(deptTypeDef, personTypeDef, + managerTypeDef)); + + ImmutableList<HierarchicalType> types = ImmutableList.of( + ts.getDataType(HierarchicalType.class, "SecurityClearance"), + ts.getDataType(ClassType.class, "Department"), + ts.getDataType(ClassType.class, "Person"), + ts.getDataType(ClassType.class, "Manager") + ); + + repo.defineTypes(types); + + } + + protected Referenceable createDeptEg1(TypeSystem ts) throws MetadataException { + Referenceable hrDept = new Referenceable("Department"); + Referenceable john = new Referenceable("Person"); + Referenceable jane = new Referenceable("Manager", "SecurityClearance"); + + hrDept.set("name", "hr"); + john.set("name", "John"); + john.set("department", hrDept); + jane.set("name", "Jane"); + jane.set("department", hrDept); + + john.set("manager", jane); + + hrDept.set("employees", ImmutableList.<Referenceable>of(john, jane)); + + jane.set("subordinates", ImmutableList.<Referenceable>of(john)); + + jane.getTrait("SecurityClearance").set("level", 1); + + ClassType deptType = ts.getDataType(ClassType.class, "Department"); + ITypedReferenceableInstance hrDept2 = deptType.convert(hrDept, Multiplicity.REQUIRED); + + return hrDept; + } +}
