Repository: incubator-atlas Updated Branches: refs/heads/master 3e4f28f50 -> d2d6ff7d1
http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala ---------------------------------------------------------------------- diff --git a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala index f65cedb..33513c5 100755 --- a/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala +++ b/repository/src/test/scala/org/apache/atlas/query/GremlinTest2.scala @@ -18,38 +18,42 @@ package org.apache.atlas.query -import com.thinkaurelius.titan.core.TitanGraph -import com.thinkaurelius.titan.core.util.TitanCleanup +import org.apache.atlas.TestUtils import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy -import org.apache.atlas.query.Expressions._ -import org.apache.atlas.repository.graph.{TitanGraphProvider, GraphBackedMetadataRepository} +import org.apache.atlas.query.Expressions._class +import org.apache.atlas.query.Expressions._trait +import org.apache.atlas.query.Expressions.id +import org.apache.atlas.repository.graph.GraphBackedMetadataRepository +import org.apache.atlas.repository.graphdb.AtlasGraph import org.apache.atlas.typesystem.types.TypeSystem -import org.testng.annotations.{Test,BeforeClass,AfterClass} +import org.testng.annotations.AfterClass +import org.testng.annotations.BeforeClass +import org.testng.annotations.BeforeMethod +import org.testng.annotations.Test +import org.apache.atlas.repository.graph.AtlasGraphProvider class GremlinTest2 extends BaseGremlinTest { - var g: TitanGraph = null - var gProvider:TitanGraphProvider = null; + var g: AtlasGraph[_,_] = null var gp:GraphPersistenceStrategies = null; + @BeforeMethod + def resetRequestContext() { + TestUtils.resetRequestContext(); + } + @BeforeClass def beforeAll() { TypeSystem.getInstance().reset() QueryTestsUtils.setupTypes - gProvider = new TitanGraphProvider(); - gp = new DefaultGraphPersistenceStrategy(new GraphBackedMetadataRepository(gProvider, null)) - g = QueryTestsUtils.setupTestGraph(gProvider) + var repo = new GraphBackedMetadataRepository(null); + gp = new DefaultGraphPersistenceStrategy(repo) + g = QueryTestsUtils.setupTestGraph(repo) } @AfterClass - def afterAll() { - g.shutdown() - try { - TitanCleanup.clear(g); - } catch { - case ex: Exception => - print("Could not clear the graph ", ex); - } + def afterAll() { + AtlasGraphProvider.cleanup(); } @Test def testTraitSelect { @@ -111,7 +115,7 @@ class GremlinTest2 extends BaseGremlinTest { "LoadProcess", "inputTables", "outputTable", - None, Some(List("name")), true, GraphPersistenceStrategy1, g).evaluate() + None, Some(List("name")), true, getPersistenceStrategy(g), g).evaluate() validateJson(r) } @@ -120,7 +124,7 @@ class GremlinTest2 extends BaseGremlinTest { "LoadProcess", "inputTables", "outputTable", - None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph + None, Some(List("name")), true, getPersistenceStrategy(g), g).graph println(r.toInstanceJson) //validateJson(r) @@ -131,7 +135,7 @@ class GremlinTest2 extends BaseGremlinTest { "LoadProcess", "inputTables", "outputTable", - None, Some(List("name")), true, GraphPersistenceStrategy1, g).evaluate() + None, Some(List("name")), true, getPersistenceStrategy(g), g).evaluate() validateJson(r) } @@ -140,8 +144,12 @@ class GremlinTest2 extends BaseGremlinTest { "LoadProcess", "inputTables", "outputTable", - None, Some(List("name")), true, GraphPersistenceStrategy1, g).graph + None, Some(List("name")), true, getPersistenceStrategy(g), g).graph println(r.toInstanceJson) } + + private def getPersistenceStrategy(g: AtlasGraph[_,_]) : GraphPersistenceStrategies = { + return GraphPersistenceStrategy1(g); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala ---------------------------------------------------------------------- diff --git a/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala b/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala old mode 100755 new mode 100644 index 2dfb67a..fa0d341 --- a/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala +++ b/repository/src/test/scala/org/apache/atlas/query/HiveTitanSample.scala @@ -18,307 +18,156 @@ package org.apache.atlas.query -import java.io.File -import java.util.concurrent.atomic.AtomicInteger -import java.util.{Date, UUID} -import javax.script.{Bindings, ScriptEngine, ScriptEngineManager} +import scala.collection.JavaConversions._ -import com.thinkaurelius.titan.core.TitanGraph -import org.apache.atlas.repository.Constants -import org.apache.atlas.repository.graph.TitanGraphProvider -import org.apache.atlas.TestUtils -import org.apache.commons.io.FileUtils -import scala.collection.mutable.ArrayBuffer +import org.apache.atlas.typesystem.ITypedReferenceableInstance +import org.apache.atlas.typesystem.json.TypedReferenceableInstanceSerializer +import org.apache.atlas.utils.HiveModel.Column +import org.apache.atlas.utils.HiveModel.DB +import org.apache.atlas.utils.HiveModel.HiveOrder +import org.apache.atlas.utils.HiveModel.LoadProcess +import org.apache.atlas.utils.HiveModel.Partition +import org.apache.atlas.utils.HiveModel.StorageDescriptor +import org.apache.atlas.utils.HiveModel.Table +import org.apache.atlas.utils.HiveModel.View +import scala.collection.mutable.Buffer -object HiveTitanSample { - - private var nextVertexId: AtomicInteger = new AtomicInteger(0) - private var nextEdgeId: AtomicInteger = new AtomicInteger(1000) - - trait Vertex { - val _id: String - - def id = _id - val __version = 0 - val __guid = s"""${UUID.randomUUID()}""".stripMargin - - def addEdge(to: Vertex, label: String, edges: ArrayBuffer[String]): Int = { - val edgeId = nextEdgeId.incrementAndGet(); - edges += - s"""{"_id" : "${edgeId}", "_type" : "edge", "_inV" : "${to.id}", "_outV" : "$id", "_label" : "$label"}""" - edgeId - } - - def toGSon(vertices: ArrayBuffer[String], - edges: ArrayBuffer[String]): Unit = { - - val sb = new StringBuilder - sb.append( s"""{"${Constants.ENTITY_TYPE_PROPERTY_KEY}" : "${this.getClass.getSimpleName}", "_type" : "vertex"""") - - this.getClass.getDeclaredFields filter (_.getName != "traits") foreach { f => - f.setAccessible(true) - val fV = f.get(this) - val convertedVal = fV match { - case _: String => s""""$fV"""" - case ls: List[_] if isPrimitiveType(ls) => - s"""["${ls.mkString(",")}"]""" - case d: Date => d.getTime - case _ => fV - } - - convertedVal match { - case x: Vertex => addEdge(x, s"__${this.getClass.getSimpleName}.${f.getName}", edges) - case l: List[_] => val edgeList = l.map(x => - s""""${addEdge(x.asInstanceOf[Vertex], s"__${this.getClass.getSimpleName}.${f.getName}", edges)}"""" - ) - if(l.head.isInstanceOf[Struct]) { - sb.append( s""", "${this.getClass.getSimpleName}.${f.getName}" : ${edgeList.mkString("[", ",", "]")}""") - } - case _ => sb.append( s""", "${f.getName}" : $convertedVal""") - sb.append( s""", "${this.getClass.getSimpleName}.${f.getName}" : $convertedVal""") - } - } - - this.getClass.getDeclaredFields filter (_.getName == "traits") foreach { f => - f.setAccessible(true) - var traits = f.get(this).asInstanceOf[Option[List[Trait]]] - - if (traits.isDefined) { - val fV = traits.get.map(_.getClass.getSimpleName).mkString(",") - sb.append( s""", "${Constants.TRAIT_NAMES_PROPERTY_KEY}" : "$fV"""") - } - } - - sb.append("}") - vertices += sb.toString() - } - - def isPrimitiveType(ls: List[_]) : Boolean = { - ls.head match { - case _: String => true - case _: Byte => true - case _: Short => true - case _: Int => true - case _: Long => true - case _: Float => true - case _: Double => true - case _: BigDecimal => true - case _: BigInt => true - case _: Boolean => true - case default => false - } - } - } - - trait Trait extends Vertex - - trait Struct extends Vertex - - trait Instance extends Vertex { - val traits: Option[List[Trait]] - - override def toGSon(vertices: ArrayBuffer[String], - edges: ArrayBuffer[String]): Unit = { - super.toGSon(vertices, edges) - - if (traits.isDefined) { - traits.get foreach { t => - t.toGSon(vertices, edges) - addEdge(t, s"${t.getClass.getSimpleName}", edges) - } - } - } - - } - - case class JdbcAccess(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait - - case class PII(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait - - case class Dimension(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait - - case class Metric(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait - - case class ETL(_id: String = "" + nextVertexId.incrementAndGet()) extends Trait - - - case class DB(name: String, owner: String, createTime: Int, clusterName: String, traits: Option[List[Trait]] = None, - _id: String = "" + nextVertexId.incrementAndGet()) extends Instance - - case class HiveOrder(col: String, order: Int, - _id: String = "" + nextVertexId.incrementAndGet()) extends Struct - - case class StorageDescriptor(inputFormat: String, outputFormat: String, - sortCols: List[Struct], _id: String = "" + nextVertexId.incrementAndGet()) extends Struct { - - override def toGSon(vertices: ArrayBuffer[String], - edges: ArrayBuffer[String]): Unit = { - sortCols.foreach(_.toGSon(vertices, edges)) - super.toGSon(vertices, edges) - } - } - - case class Column(name: String, dataType: String, sd: StorageDescriptor, - traits: Option[List[Trait]] = None, - _id: String = "" + nextVertexId.incrementAndGet()) extends Instance - - case class Table(name: String, db: DB, sd: StorageDescriptor, - created: Date, - traits: Option[List[Trait]] = None, - _id: String = "" + nextVertexId.incrementAndGet()) extends Instance - case class TableDef(name: String, db: DB, sd: StorageDescriptor, - columns: List[(String, String, Option[List[Trait]])], - traits: Option[List[Trait]] = None, - created: Option[Date] = None) { - val createdDate : Date = created match { - case Some(x) => x - case None => new Date(TestUtils.TEST_DATE_IN_LONG) - } - val colDefs = columns map { c => - Column(c._1, c._2, sd, c._3) - } - val tablDef = Table(name, db, sd, createdDate, traits) - - def toGSon(vertices: ArrayBuffer[String], - edges: ArrayBuffer[String]): Unit = { - sd.toGSon(vertices, edges) - colDefs foreach { - _.toGSon(vertices, edges) - } - tablDef.toGSon(vertices, edges) - } - } - - case class Partition(values: List[String], table: Table, traits: Option[List[Trait]] = None, - _id: String = "" + nextVertexId.incrementAndGet()) extends Instance - - case class LoadProcess(name: String, inputTables: List[Vertex], - outputTable: Vertex, - traits: Option[List[Trait]] = None, - _id: String = "" + nextVertexId.incrementAndGet()) extends Instance - - case class View(name: String, db: DB, inputTables: List[Vertex], - traits: Option[List[Trait]] = None, - _id: String = "" + nextVertexId.incrementAndGet()) extends Instance - - val salesDB = DB("Sales", "John ETL", 1000, "test") - val salesFact = TableDef("sales_fact", +object HiveTitanSample { + + val MetricTrait = "Metric" + val DimensionTrait = "Dimension" + val ETLTrait = "ETL" + val JdbcAccessTrait = "JdbcAccess" + + val salesDB = new DB("Sales", "John ETL", 1000, "test") + + + + val salesFact = new Table("sales_fact", salesDB, - StorageDescriptor("TextInputFormat", - "TextOutputFormat", List(HiveOrder("customer_id", 0))), - List( - ("time_id", "int", None), - ("product_id", "int", None), - ("customer_id", "int", None), - ("created", "date", None), - ("sales", "double", Some(List(Metric()))) - )) - val productDim = TableDef("product_dim", + new StorageDescriptor("TextInputFormat", + "TextOutputFormat", List(new HiveOrder("customer_id", 0))), + List( + new Column("time_id", "int"), + new Column("product_id", "int"), + new Column("customer_id", "int"), + new Column("created", "date"), + new Column("sales", "double").withTrait(MetricTrait) + ) + ); + + + val productDim = new Table("product_dim", salesDB, - StorageDescriptor("TextInputFormat", - "TextOutputFormat", List(HiveOrder("product_id", 0))), + new StorageDescriptor("TextInputFormat", + "TextOutputFormat", List(new HiveOrder("product_id", 0))), List( - ("product_id", "int", None), - ("product_name", "string", None), - ("brand_name", "string", None) - ), - Some(List(Dimension()))) - val timeDim = TableDef("time_dim", + new Column("product_id", "int"), + new Column("product_name", "string"), + new Column("brand_name", "string") + ) + ).withTrait(DimensionTrait) + + val timeDim = new Table("time_dim", salesDB, - StorageDescriptor("TextInputFormat", - "TextOutputFormat", List(HiveOrder("time_id", 0))), + new StorageDescriptor("TextInputFormat", + "TextOutputFormat", List(new HiveOrder("time_id", 0))), List( - ("time_id", "int", None), - ("dayOfYear", "int", None), - ("weekDay", "string", None) - ), - Some(List(Dimension()))) - val customerDim = TableDef("customer_dim", + new Column("time_id", "int"), + new Column("dayOfYear", "int"), + new Column("weekDay", "string") + ) + ).withTrait(DimensionTrait) + + val customerDim = new Table("customer_dim", salesDB, - StorageDescriptor("TextInputFormat", - "TextOutputFormat", List(HiveOrder("customer_id", 0))), + new StorageDescriptor("TextInputFormat", + "TextOutputFormat", List(new HiveOrder("customer_id", 0))), List( - ("customer_id", "int", None), - ("name", "int", None), - ("address", "string", Some(List(PII()))) - ), - Some(List(Dimension()))) + new Column("customer_id", "int"), + new Column("name", "int"), + new Column("address", "string").withTrait("PII") + ) + ).withTrait(DimensionTrait) + - val reportingDB = DB("Reporting", "Jane BI", 1500, "test") - val salesFactDaily = TableDef("sales_fact_daily_mv", + val reportingDB = new DB("Reporting", "Jane BI", 1500, "test") + val salesFactDaily = new Table("sales_fact_daily_mv", reportingDB, - StorageDescriptor("TextInputFormat", - "TextOutputFormat", List(HiveOrder("customer_id", 0))), + new StorageDescriptor("TextInputFormat", + "TextOutputFormat", List(new HiveOrder("customer_id", 0))), List( - ("time_id", "int", None), - ("product_id", "int", None), - ("customer_id", "int", None), - ("sales", "double", Some(List(Metric()))) - )) - val loadSalesFactDaily = LoadProcess("loadSalesDaily", - List(salesFact.tablDef, timeDim.tablDef), salesFactDaily.tablDef, - Some(List(ETL()))) + new Column("time_id", "int"), + new Column("product_id", "int"), + new Column("customer_id", "int"), + new Column("sales", "double").withTrait(MetricTrait) + ) + ) + + val loadSalesFactDaily = new LoadProcess( + "loadSalesDaily", + List(salesFact, timeDim), + salesFactDaily + ).withTrait(ETLTrait) + - val productDimView = View("product_dim_view", reportingDB, - List(productDim.tablDef), - Some(List(Dimension(), JdbcAccess()))) + val productDimView = new View( + "product_dim_view", + reportingDB, + List(productDim) + ).withTraits(List(DimensionTrait, JdbcAccessTrait)) - val customerDimView = View("customer_dim_view", reportingDB, - List(customerDim.tablDef), - Some(List(Dimension(), JdbcAccess()))) + val customerDimView = new View( + "customer_dim_view", + reportingDB, + List(customerDim) + + ).withTraits(List(DimensionTrait, JdbcAccessTrait)) - val salesFactMonthly = TableDef("sales_fact_monthly_mv", + val salesFactMonthly = new Table("sales_fact_monthly_mv", reportingDB, - StorageDescriptor("TextInputFormat", - "TextOutputFormat", List(HiveOrder("customer_id", 0))), + new StorageDescriptor( + "TextInputFormat", + "TextOutputFormat", + List(new HiveOrder("customer_id", 0)) + ), List( - ("time_id", "int", None), - ("product_id", "int", None), - ("customer_id", "int", None), - ("sales", "double", Some(List(Metric()))) - )) - val loadSalesFactMonthly = LoadProcess("loadSalesMonthly", - List(salesFactDaily.tablDef), salesFactMonthly.tablDef, - Some(List(ETL()))) - - val salesDailyPartition = Partition(List("2015-01-01"),salesFactDaily.tablDef) - - - val vertices: ArrayBuffer[String] = new ArrayBuffer[String]() - val edges: ArrayBuffer[String] = new ArrayBuffer[String]() - - salesDB.toGSon(vertices, edges) - salesFact.toGSon(vertices, edges) - productDim.toGSon(vertices, edges) - timeDim.toGSon(vertices, edges) - customerDim.toGSon(vertices, edges) - - reportingDB.toGSon(vertices, edges) - salesFactDaily.toGSon(vertices, edges) - loadSalesFactDaily.toGSon(vertices, edges) - productDimView.toGSon(vertices, edges) - customerDimView.toGSon(vertices, edges) - salesFactMonthly.toGSon(vertices, edges) - loadSalesFactMonthly.toGSon(vertices, edges) - salesDailyPartition.toGSon(vertices, edges) - - def toGSon(): String = { - s"""{ - "mode":"NORMAL", - "vertices": ${vertices.mkString("[\n\t", ",\n\t", "\n]")}, - "edges": ${edges.mkString("[\n\t", ",\n\t", "\n]")} - } - """.stripMargin - } - - def writeGson(fileName: String): Unit = { - FileUtils.writeStringToFile(new File(fileName), toGSon()) - } - + new Column("time_id", "int"), + new Column("product_id", "int"), + new Column("customer_id", "int"), + new Column("sales", "double").withTrait(MetricTrait) + ) + ) + val loadSalesFactMonthly = new LoadProcess("loadSalesMonthly", + List(salesFactDaily), salesFactMonthly).withTraits(List(ETLTrait)) + + val salesDailyPartition = new Partition(List("2015-01-01"), salesFactDaily) + + import scala.collection.JavaConversions._ + + def getEntitiesToCreate() : Buffer[ITypedReferenceableInstance] = { + var list = salesDB.getTypedReferencebles() ++ + salesFact.getTypedReferencebles() ++ + productDim.getTypedReferencebles() ++ + timeDim.getTypedReferencebles() ++ + customerDim.getTypedReferencebles() ++ + reportingDB.getTypedReferencebles() ++ + salesFactDaily.getTypedReferencebles() ++ + loadSalesFactDaily.getTypedReferencebles() ++ + productDimView.getTypedReferencebles() ++ + customerDimView.getTypedReferencebles() ++ + salesFactMonthly.getTypedReferencebles() ++ + loadSalesFactMonthly.getTypedReferencebles() ++ + salesDailyPartition.getTypedReferencebles(); + return list; + + } + + val GremlinQueries = List( // 1. List all DBs @@ -367,28 +216,28 @@ object HiveTitanSample { ) } -object TestApp extends App with GraphUtils { - - val g: TitanGraph = TitanGraphProvider.getGraphInstance - val manager: ScriptEngineManager = new ScriptEngineManager - val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy") - val bindings: Bindings = engine.createBindings - bindings.put("g", g) - - val hiveGraphFile = FileUtils.getTempDirectory().getPath + File.separator + System.nanoTime() + ".gson" - HiveTitanSample.writeGson(hiveGraphFile) - bindings.put("hiveGraphFile", hiveGraphFile) - - try { - engine.eval("g.loadGraphSON(hiveGraphFile)", bindings) - - println(engine.eval("g.V.typeName.toList()", bindings)) - - HiveTitanSample.GremlinQueries.foreach { q => - println(q) - println("Result: " + engine.eval(q + ".toList()", bindings)) - } - } finally { - g.shutdown() - } -} \ No newline at end of file +//object TestApp extends App with GraphUtils { +// +// val g: TitanGraph = TitanGraphProvider.getGraphInstance +// val manager: ScriptEngineManager = new ScriptEngineManager +// val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy") +// val bindings: Bindings = engine.createBindings +// bindings.put("g", g) +// +// val hiveGraphFile = FileUtils.getTempDirectory().getPath + File.separator + System.nanoTime() + ".gson" +// HiveTitanSample.writeGson(hiveGraphFile) +// bindings.put("hiveGraphFile", hiveGraphFile) +// +// try { +// engine.eval("g.loadGraphSON(hiveGraphFile)", bindings) +// +// println(engine.eval("g.V.typeName.toList()", bindings)) +// +// HiveTitanSample.GremlinQueries.foreach { q => +// println(q) +// println("Result: " + engine.eval(q + ".toList()", bindings)) +// } +// } finally { +// g.shutdown() +// } +//} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala ---------------------------------------------------------------------- diff --git a/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala b/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala index c8b635a..bb44686 100755 --- a/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala +++ b/repository/src/test/scala/org/apache/atlas/query/LineageQueryTest.scala @@ -18,43 +18,51 @@ package org.apache.atlas.query -import com.thinkaurelius.titan.core.TitanGraph -import com.thinkaurelius.titan.core.util.TitanCleanup +import org.apache.atlas.TestUtils import org.apache.atlas.discovery.graph.DefaultGraphPersistenceStrategy -import org.apache.atlas.query.Expressions._ -import org.apache.atlas.repository.graph.{GraphBackedMetadataRepository, TitanGraphProvider} +import org.apache.atlas.query.Expressions._class +import org.apache.atlas.query.Expressions.id +import org.apache.atlas.query.Expressions.int +import org.apache.atlas.repository.graph.AtlasGraphProvider +import org.apache.atlas.repository.graph.GraphBackedMetadataRepository +import org.apache.atlas.repository.graphdb.AtlasGraph import org.apache.atlas.typesystem.types.TypeSystem -import org.testng.annotations.{Test,BeforeClass,AfterClass} +import org.testng.annotations.AfterClass +import org.testng.annotations.BeforeClass +import org.testng.annotations.BeforeMethod +import org.testng.annotations.Test class LineageQueryTest extends BaseGremlinTest { - var g: TitanGraph = null - var gProvider:TitanGraphProvider = null; + var g: AtlasGraph[_,_] = null var gp:GraphPersistenceStrategies = null; + @BeforeMethod + def resetRequestContext() { + TestUtils.resetRequestContext() + } + + @BeforeClass def beforeAll() { - TypeSystem.getInstance().reset() - QueryTestsUtils.setupTypes - gProvider = new TitanGraphProvider(); - gp = new DefaultGraphPersistenceStrategy(new GraphBackedMetadataRepository(gProvider, null)) - g = QueryTestsUtils.setupTestGraph(gProvider) + TypeSystem.getInstance().reset() + var repo = new GraphBackedMetadataRepository(null); + TestUtils.setupGraphProvider(repo); + //force graph to be initialized first + AtlasGraphProvider.getGraphInstance(); + + //create types and indices up front. Without this, some of the property keys (particularly __traitNames and __superTypes) + //get ended up created implicitly with some graph backends with the wrong multiplicity. This also makes the queries + //we execute perform better :-) + QueryTestsUtils.setupTypesAndIndices() + + gp = new DefaultGraphPersistenceStrategy(repo); + g = QueryTestsUtils.setupTestGraph(repo) } @AfterClass def afterAll() { - try { - g.shutdown() - } catch { - case ex: Exception => - print("Could not shutdown the graph ", ex); - } - try { - TitanCleanup.clear(g); - } catch { - case ex: Exception => - print("Could not clear the graph ", ex); - } + AtlasGraphProvider.cleanup() } val PREFIX_SPACES_REGEX = ("\\n\\s*").r http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala ---------------------------------------------------------------------- diff --git a/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala b/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala index b5faaf3..33275d3 100755 --- a/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala +++ b/repository/src/test/scala/org/apache/atlas/query/QueryTestsUtils.scala @@ -22,10 +22,8 @@ import java.io.File import javax.script.{Bindings, ScriptEngine, ScriptEngineManager} import com.google.common.collect.ImmutableList -import com.thinkaurelius.titan.core.{TitanFactory, TitanGraph} -import com.tinkerpop.blueprints.Vertex +import org.apache.atlas.repository.graphdb.AtlasVertex import com.typesafe.config.{Config, ConfigFactory} -import org.apache.atlas.repository.graph.TitanGraphProvider import org.apache.atlas.typesystem.types._ import org.apache.commons.configuration.{Configuration, ConfigurationException, MapConfiguration} import org.apache.commons.io.FileUtils @@ -34,6 +32,13 @@ import org.json.JSONObject import org.skyscreamer.jsonassert.JSONAssert import scala.util.Random +import org.apache.atlas.repository.MetadataRepository +import org.apache.atlas.repository.graphdb.AtlasGraph +import org.apache.atlas.repository.graph.AtlasGraphProvider +import java.net.URL +import org.apache.atlas.repository.graph.GraphBackedSearchIndexer +import org.apache.atlas.typesystem.TypesDef +import org.apache.atlas.typesystem.ITypedReferenceableInstance trait GraphUtils { @@ -52,12 +57,12 @@ trait GraphUtils { } - def titanGraph(conf: Configuration) = { + def graph(conf: Configuration) = { try { - val g = TitanFactory.open(conf) + val g = AtlasGraphProvider.getGraphInstance val mgmt = g.getManagementSystem - val typname = mgmt.makePropertyKey("typeName").dataType(classOf[String]).make() - mgmt.buildIndex("byTypeName", classOf[Vertex]).addKey(typname).buildCompositeIndex() + val typname = mgmt.makePropertyKey("typeName", classOf[String], null); + mgmt.createExactMatchIndex("byTypeName", false, List(typname)); mgmt.commit() g } catch { @@ -68,7 +73,21 @@ trait GraphUtils { object QueryTestsUtils extends GraphUtils { - def setupTypes: Unit = { + def setupTypesAndIndices() : Unit = { + val indexer = new GraphBackedSearchIndexer(); + val typesDef : TypesDef = defineTypes; + val newTypes = TypeSystem.getInstance.defineTypes(typesDef); + indexer.onAdd(newTypes.values()); + } + + def setupTypes: Unit = { + + val types : TypesDef = defineTypes; + TypeSystem.getInstance.defineTypes(types); + } + + + def defineTypes: TypesDef = { def attrDef(name: String, dT: IDataType[_], m: Multiplicity = Multiplicity.OPTIONAL, isComposite: Boolean = false, @@ -144,36 +163,29 @@ object QueryTestsUtils extends GraphUtils { def jdbcTraitDef = new HierarchicalTypeDefinition[TraitType](classOf[TraitType], "JdbcAccess", null, null, Array[AttributeDefinition]()) - TypeSystem.getInstance().defineTypes(ImmutableList.of[EnumTypeDefinition], - ImmutableList.of[StructTypeDefinition](hiveOrderDef), - ImmutableList.of[HierarchicalTypeDefinition[TraitType]](dimTraitDef, piiTraitDef, + TypesDef(Seq[EnumTypeDefinition](), + Seq[StructTypeDefinition](hiveOrderDef), + Seq[HierarchicalTypeDefinition[TraitType]](dimTraitDef, piiTraitDef, metricTraitDef, etlTraitDef, jdbcTraitDef), - ImmutableList.of[HierarchicalTypeDefinition[ClassType]](dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, + Seq[HierarchicalTypeDefinition[ClassType]](dbClsDef, storageDescClsDef, columnClsDef, tblClsDef, partitionClsDef, loadProcessClsDef, viewClsDef)) - - () } - def setupTestGraph(gp: TitanGraphProvider): TitanGraph = { - var conf = TitanGraphProvider.getConfiguration - conf.setProperty("storage.directory", - conf.getString("storage.directory") + "/../graph-data/" + RandomStringUtils.randomAlphanumeric(10)) - val g = TitanFactory.open(conf) - val manager: ScriptEngineManager = new ScriptEngineManager - val engine: ScriptEngine = manager.getEngineByName("gremlin-groovy") - val bindings: Bindings = engine.createBindings - bindings.put("g", g) - - val hiveGraphFile = FileUtils.getTempDirectory().getPath + File.separator + System.nanoTime() + ".gson" - HiveTitanSample.writeGson(hiveGraphFile) - bindings.put("hiveGraphFile", hiveGraphFile) - - engine.eval("g.loadGraphSON(hiveGraphFile)", bindings) + def setupTestGraph(repo : MetadataRepository): AtlasGraph[_,_] = { + + val g = AtlasGraphProvider.getGraphInstance(); + val entities = HiveTitanSample.getEntitiesToCreate(); + repo.createEntities(entities:_*) + g.commit(); g } + + } + + trait BaseGremlinTest { val STRUCT_NAME_REGEX = (TypeUtils.TEMP_STRUCT_NAME_PREFIX + "\\d+").r def validateJson(r: GremlinQueryResult, expected: String = null): Unit = { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java ---------------------------------------------------------------------- diff --git a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java index 06da32e..18ef2ee 100755 --- a/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java +++ b/typesystem/src/main/java/org/apache/atlas/typesystem/types/Multiplicity.java @@ -41,6 +41,11 @@ public final class Multiplicity { this.isUnique = isUnique; } + public boolean isMany() { + return upper > 1; + } + + public boolean nullAllowed() { return lower == 0; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/typesystem/src/test/resources/atlas-application.properties ---------------------------------------------------------------------- diff --git a/typesystem/src/test/resources/atlas-application.properties b/typesystem/src/test/resources/atlas-application.properties index fb31462..108630b 100644 --- a/typesystem/src/test/resources/atlas-application.properties +++ b/typesystem/src/test/resources/atlas-application.properties @@ -19,6 +19,8 @@ #system property atlas.data=${sys:user.dir}/target/data + + #re-use existing property atlas.graph.data=${atlas.data}/graph @@ -30,10 +32,17 @@ atlas.db=${atlasdb} atlas.TypeSystem.impl=org.apache.atlas.typesystem.types.TypeSystem + + ######### Atlas Server Configs ######### atlas.rest.address=http://localhost:31000 ######### Graph Database Configs ######### + + +# Graph database implementation. Value inserted by maven. +atlas.graphdb.backend=${graphdb.backend.impl} + # Graph Storage atlas.graph.storage.backend=${titan.storage.backend} http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/pom.xml ---------------------------------------------------------------------- diff --git a/webapp/pom.xml b/webapp/pom.xml index 8fe4b9b..82f307c 100755 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -35,6 +35,7 @@ <projectBaseDir>${project.basedir}/..</projectBaseDir> <debug.jetty.daemon>true</debug.jetty.daemon> <packages.to.exclude /> + <log4j.configuration.url>file://${project.build.directory}/../../distro/src/conf/atlas-log4j.xml</log4j.configuration.url> </properties> <profiles> @@ -85,6 +86,18 @@ <packages.to.exclude>WEB-INF/lib/je-*.jar</packages.to.exclude> </properties> </profile> + + <profile> + <id>Windows</id> + <activation> + <os> + <family>windows</family> + </os> + </activation> + <properties> + <log4j.configuration.url>file:/${project.build.directory}/../../distro/src/conf/atlas-log4j.xml</log4j.configuration.url> + </properties> + </profile> </profiles> <dependencies> @@ -105,11 +118,6 @@ <dependency> <groupId>org.apache.atlas</groupId> - <artifactId>atlas-graphdb-titan0</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.atlas</groupId> <artifactId>atlas-client</artifactId> </dependency> @@ -148,6 +156,12 @@ <artifactId>atlas-catalog</artifactId> </dependency> + <dependency> + <groupId>org.apache.atlas</groupId> + <artifactId>atlas-graphdb-impls</artifactId> + <type>pom</type> + </dependency> + <!-- supports simple auth handler --> <dependency> <groupId>org.apache.httpcomponents</groupId> @@ -455,7 +469,7 @@ <systemProperties> <systemProperty> <name>log4j.configuration</name> - <value>file://${project.build.directory}/../../distro/src/conf/atlas-log4j.xml</value> + <value>${log4j.configuration.url}</value> </systemProperty> <systemProperty> <name>atlas.log.file</name> http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java index a1d3187..7a8a4f0 100755 --- a/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java +++ b/webapp/src/main/java/org/apache/atlas/web/listeners/GuiceServletConfig.java @@ -18,26 +18,19 @@ package org.apache.atlas.web.listeners; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.Module; -import com.google.inject.Provider; -import com.google.inject.Stage; -import com.google.inject.TypeLiteral; -import com.google.inject.servlet.GuiceServletContextListener; -import com.sun.jersey.api.core.PackagesResourceConfig; -import com.sun.jersey.guice.JerseyServletModule; -import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; -import com.thinkaurelius.titan.core.TitanGraph; -import com.tinkerpop.blueprints.Graph; +import java.util.HashMap; +import java.util.Map; + +import javax.servlet.ServletContextEvent; + import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasException; import org.apache.atlas.RepositoryMetadataModule; import org.apache.atlas.ha.HAConfiguration; import org.apache.atlas.notification.NotificationModule; -import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.service.Services; import org.apache.atlas.web.filters.ActiveServerFilter; import org.apache.atlas.web.filters.AuditFilter; @@ -48,9 +41,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; -import javax.servlet.ServletContextEvent; -import java.util.HashMap; -import java.util.Map; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Stage; +import com.google.inject.servlet.GuiceServletContextListener; +import com.sun.jersey.api.core.PackagesResourceConfig; +import com.sun.jersey.guice.JerseyServletModule; +import com.sun.jersey.guice.spi.container.servlet.GuiceContainer; public class GuiceServletConfig extends GuiceServletContextListener { @@ -159,10 +157,8 @@ public class GuiceServletConfig extends GuiceServletContextListener { if(injector != null) { //stop services stopServices(); - - TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {}; - Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType)); - final Graph graph = graphProvider.get().get(); + + final AtlasGraph graph = AtlasGraphProvider.getGraphInstance(); try { graph.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java ---------------------------------------------------------------------- diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java b/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java index d43c8cc..dfd29b1 100644 --- a/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/BaseService.java @@ -18,22 +18,31 @@ package org.apache.atlas.web.resources; -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; -import org.apache.atlas.catalog.*; -import org.apache.atlas.catalog.exception.*; -import org.apache.atlas.repository.graph.TitanGraphProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.ws.rs.core.Context; -import javax.ws.rs.core.UriInfo; -import javax.xml.bind.annotation.XmlRootElement; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.util.Collection; import java.util.Map; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.UriInfo; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.atlas.catalog.JsonSerializer; +import org.apache.atlas.catalog.Request; +import org.apache.atlas.catalog.ResourceProvider; +import org.apache.atlas.catalog.Result; +import org.apache.atlas.catalog.exception.CatalogException; +import org.apache.atlas.catalog.exception.CatalogRuntimeException; +import org.apache.atlas.catalog.exception.InvalidPayloadException; +import org.apache.atlas.catalog.exception.InvalidQueryException; +import org.apache.atlas.catalog.exception.ResourceNotFoundException; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; + /** * Base class for all v1 API services. */ @@ -135,7 +144,7 @@ public abstract class BaseService { //todo: abstract via AtlasTypeSystem // ensure that the thread wasn't re-pooled with an existing transaction protected void initializeGraphTransaction() { - TitanGraphProvider.getGraphInstance().rollback(); + AtlasGraphProvider.getGraphInstance().rollback(); } private RuntimeException wrapRuntimeException(RuntimeException e) { http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java index 683a028..961154b 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java @@ -56,49 +56,55 @@ public class NotificationHookConsumerKafkaTest { @Test public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException, InterruptedException { - produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); - - NotificationConsumer<HookNotification.HookNotificationMessage> consumer = - createNewConsumer(kafkaNotification, false); - LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(kafkaNotification, localAtlasClient); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(consumer); - - consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user1"); - - // produce another message, and make sure it moves ahead. If commit succeeded, this would work. - produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); - consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user2"); - - kafkaNotification.close(); + try { + produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity())); + + NotificationConsumer<HookNotification.HookNotificationMessage> consumer = + createNewConsumer(kafkaNotification, false); + LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); + NotificationHookConsumer notificationHookConsumer = + new NotificationHookConsumer(kafkaNotification, localAtlasClient); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(consumer); + + consumeOneMessage(consumer, hookConsumer); + verify(localAtlasClient).setUser("test_user1"); + + // produce another message, and make sure it moves ahead. If commit succeeded, this would work. + produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity())); + consumeOneMessage(consumer, hookConsumer); + verify(localAtlasClient).setUser("test_user2"); + } + finally { + kafkaNotification.close(); + } } @Test(dependsOnMethods = "testConsumerConsumesNewMessageWithAutoCommitDisabled") public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled() throws Exception { - produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity())); - - NotificationConsumer<HookNotification.HookNotificationMessage> consumer = - createNewConsumer(kafkaNotification, true); - LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); - NotificationHookConsumer notificationHookConsumer = - new NotificationHookConsumer(kafkaNotification, localAtlasClient); - NotificationHookConsumer.HookConsumer hookConsumer = - notificationHookConsumer.new HookConsumer(consumer); - - consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user3"); - - // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. - produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity())); - - consumeOneMessage(consumer, hookConsumer); - verify(localAtlasClient).setUser("test_user3"); - - kafkaNotification.close(); + try { + produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity())); + + NotificationConsumer<HookNotification.HookNotificationMessage> consumer = + createNewConsumer(kafkaNotification, true); + LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class); + NotificationHookConsumer notificationHookConsumer = + new NotificationHookConsumer(kafkaNotification, localAtlasClient); + NotificationHookConsumer.HookConsumer hookConsumer = + notificationHookConsumer.new HookConsumer(consumer); + + consumeOneMessage(consumer, hookConsumer); + verify(localAtlasClient).setUser("test_user3"); + + // produce another message, but this will not be consumed, as commit code is not executed in hook consumer. + produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity())); + + consumeOneMessage(consumer, hookConsumer); + verify(localAtlasClient).setUser("test_user3"); + } + finally { + kafkaNotification.close(); + } } NotificationConsumer<HookNotification.HookNotificationMessage> createNewConsumer( @@ -126,6 +132,7 @@ public class NotificationHookConsumerKafkaTest { final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE); entity.set("name", "db" + randomString()); entity.set("description", randomString()); + entity.set("qualifiedName", randomString()); return entity; } http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/d2d6ff7d/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java b/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java index 08bb125..88cfc63 100644 --- a/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java +++ b/webapp/src/test/java/org/apache/atlas/web/listeners/TestGuiceServletConfig.java @@ -16,20 +16,18 @@ */ package org.apache.atlas.web.listeners; -import com.google.inject.Key; -import com.google.inject.Module; -import com.google.inject.Provider; -import com.google.inject.TypeLiteral; -import com.thinkaurelius.titan.core.TitanGraph; -import com.thinkaurelius.titan.core.util.TitanCleanup; +import javax.servlet.ServletContextEvent; + import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; -import org.apache.atlas.repository.graph.GraphProvider; +import org.apache.atlas.repository.graph.AtlasGraphProvider; +import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.commons.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.servlet.ServletContextEvent; +import com.google.inject.Module; +import com.thinkaurelius.titan.core.util.TitanCleanup; public class TestGuiceServletConfig extends GuiceServletConfig { @@ -47,13 +45,11 @@ public class TestGuiceServletConfig extends GuiceServletConfig { super.contextDestroyed(servletContextEvent); if(injector != null) { - TypeLiteral<GraphProvider<TitanGraph>> graphProviderType = new TypeLiteral<GraphProvider<TitanGraph>>() {}; - Provider<GraphProvider<TitanGraph>> graphProvider = injector.getProvider(Key.get(graphProviderType)); - TitanGraph graph = graphProvider.get().get(); + AtlasGraph graph = AtlasGraphProvider.getGraphInstance(); LOG.info("Clearing graph store"); try { - TitanCleanup.clear(graph); + AtlasGraphProvider.cleanup(); } catch (Exception e) { LOG.warn("Clearing graph store failed ", e); }
