This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 873d76b4fcf9bbddd0fc513d4c52fb1d8b4ff2af Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Mon Jan 28 10:14:53 2019 +0530 [HOTFIX] SDV framework for presto cluster test suite [HOTFIX] SDV framework for presto cluster test suite a) Added a suite for presto cluster test with a sample test case where carbon presto reads the store created by spark. b) When single suite selected for running. other module test cases were running like SDK, CLI, processing. Fixed this problem adding sdvtest profile modules that has issues This closes #3111 --- common/pom.xml | 8 ++ examples/flink/pom.xml | 8 ++ format/pom.xml | 8 ++ .../PrestoAllDataTypeLocalDictTest.scala | 2 +- .../integrationtest/PrestoAllDataTypeTest.scala | 2 +- .../PrestoTestNonTransactionalTableFiles.scala | 2 +- .../carbondata/presto/server/PrestoServer.scala | 26 +++--- integration/spark-common-cluster-test/pom.xml | 17 ++++ .../sdv/generated/PrestoSampleTestCase.scala | 56 +++++++++++++ .../carbondata/cluster/sdv/suite/SDVSuites.scala | 16 ++++ .../apache/spark/sql/common/util/QueryTest.scala | 97 ++++++++++++++++++++-- .../spark/sql/test/Spark2TestQueryExecutor.scala | 5 ++ pom.xml | 2 + processing/pom.xml | 8 ++ store/sdk/pom.xml | 8 ++ streaming/pom.xml | 8 ++ tools/cli/pom.xml | 8 ++ 17 files changed, 255 insertions(+), 26 deletions(-) diff --git a/common/pom.xml b/common/pom.xml index e0022ef..e9b482b 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -68,5 +68,13 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + </profiles> </project> diff --git a/examples/flink/pom.xml b/examples/flink/pom.xml index 8d44e82..aae7b5e 100644 --- a/examples/flink/pom.xml +++ b/examples/flink/pom.xml @@ -74,5 +74,13 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + </profiles> </project> \ No newline at end of file diff --git a/format/pom.xml b/format/pom.xml index 7afd6ef..a7a7a0d 100644 --- a/format/pom.xml +++ b/format/pom.xml @@ -68,5 +68,13 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + </profiles> </project> \ No newline at end of file diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala index 4360977..2735969 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala @@ -79,7 +79,7 @@ class PrestoAllDataTypeLocalDictTest extends FunSuiteLike with BeforeAndAfterAll map.put("hive.metastore", "file") map.put("hive.metastore.catalog.dir", s"file://$storePath") - prestoServer.startServer(storePath, "testdb", map) + prestoServer.startServer("testdb", map) prestoServer.execute("drop table if exists testdb.testtable") prestoServer.execute("drop schema if exists testdb") prestoServer.execute("create schema testdb") diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala index 17490e4..205469c 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala @@ -80,7 +80,7 @@ class PrestoAllDataTypeTest extends FunSuiteLike with BeforeAndAfterAll { map.put("hive.metastore", "file") map.put("hive.metastore.catalog.dir", s"file://$storePath") - prestoServer.startServer(storePath, "testdb", map) + prestoServer.startServer("testdb", map) prestoServer.execute("drop table if exists testdb.testtable") prestoServer.execute("drop schema if exists testdb") prestoServer.execute("create schema testdb") diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala index 6d17b8b..bdee4a1 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala @@ -57,7 +57,7 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf map.put("hive.metastore", "file") map.put("hive.metastore.catalog.dir", s"file://$storePath") - prestoServer.startServer(storePath, "sdk_output", map) + prestoServer.startServer("sdk_output", map) } override def afterAll(): Unit = { diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala index 0bde313..672e90f 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala @@ -25,6 +25,7 @@ import scala.util.{Failure, Success, Try} import com.facebook.presto.Session import com.facebook.presto.execution.QueryIdGenerator +import com.facebook.presto.jdbc.PrestoStatement import com.facebook.presto.metadata.SessionPropertyManager import com.facebook.presto.spi.`type`.TimeZoneKey.UTC_KEY import com.facebook.presto.spi.security.Identity @@ -47,18 +48,17 @@ class PrestoServer { createSession lazy val queryRunner = new DistributedQueryRunner(createSession, 4, prestoProperties) var dbName : String = null + var statement : PrestoStatement = _ /** * start the presto server * - * @param carbonStorePath the store path of carbon */ - def startServer(carbonStorePath: String): Unit = { + def startServer(): Unit = { LOGGER.info("======== STARTING PRESTO SERVER ========") - val queryRunner: DistributedQueryRunner = createQueryRunner( - prestoProperties, carbonStorePath) + val queryRunner: DistributedQueryRunner = createQueryRunner(prestoProperties) LOGGER.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl) } @@ -66,25 +66,23 @@ class PrestoServer { /** * start the presto server * - * @param carbonStorePath the store path of carbon * @param dbName the database name, if not a default database */ - def startServer(carbonStorePath: String, dbName: String, properties: util.Map[String, String]= new util.HashMap[String, String]()): Unit = { + def startServer(dbName: String, properties: util.Map[String, String] = new util.HashMap[String, String]()): Unit = { this.dbName = dbName carbonProperties.putAll(properties) LOGGER.info("======== STARTING PRESTO SERVER ========") - val queryRunner: DistributedQueryRunner = createQueryRunner( - prestoProperties, carbonStorePath) - + val queryRunner: DistributedQueryRunner = createQueryRunner(prestoProperties) + val conn: Connection = createJdbcConnection(dbName) + statement = conn.createStatement().asInstanceOf[PrestoStatement] LOGGER.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl) } /** * Instantiates the Presto Server to connect with the Apache CarbonData */ - private def createQueryRunner(extraProperties: util.Map[String, String], - carbonStorePath: String): DistributedQueryRunner = { + private def createQueryRunner(extraProperties: util.Map[String, String]) = { Try { queryRunner.installPlugin(new CarbondataPlugin) val carbonProperties = ImmutableMap.builder[String, String] @@ -105,6 +103,7 @@ class PrestoServer { */ def stopServer(): Unit = { queryRunner.close() + statement.close() LOGGER.info("***** Stopping The Server *****") } @@ -117,9 +116,7 @@ class PrestoServer { def executeQuery(query: String): List[Map[String, Any]] = { Try { - val conn: Connection = createJdbcConnection(dbName) LOGGER.info(s"***** executing the query ***** \n $query") - val statement = conn.createStatement() val result: ResultSet = statement.executeQuery(query) convertResultSetToList(result) } match { @@ -131,11 +128,8 @@ class PrestoServer { } def execute(query: String) = { - Try { - val conn: Connection = createJdbcConnection(dbName) LOGGER.info(s"***** executing the query ***** \n $query") - val statement = conn.createStatement() statement.execute(query) } match { case Success(result) => result diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml index 23acfd8..a6c85ad 100644 --- a/integration/spark-common-cluster-test/pom.xml +++ b/integration/spark-common-cluster-test/pom.xml @@ -80,6 +80,21 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.facebook.presto</groupId> + <artifactId>presto-jdbc</artifactId> + <version>0.210</version> + <exclusions> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> @@ -164,6 +179,8 @@ <java.awt.headless>true</java.awt.headless> <spark.master.url>${spark.master.url}</spark.master.url> <hdfs.url>${hdfs.url}</hdfs.url> + <presto.jdbc.url>${presto.jdbc.url}</presto.jdbc.url> + <spark.hadoop.hive.metastore.uris>${spark.hadoop.hive.metastore.uris}</spark.hadoop.hive.metastore.uris> <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store> </systemProperties> </configuration> diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PrestoSampleTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PrestoSampleTestCase.scala new file mode 100644 index 0000000..336f8bc --- /dev/null +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/PrestoSampleTestCase.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.cluster.sdv.generated + +import org.apache.spark.sql.common.util._ +import org.scalatest.BeforeAndAfterAll + +class PrestoSampleTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("DROP TABLE IF EXISTS sample_table") + if (System.getProperty("spark.master.url") != null) { + QueryTest.PrestoQueryTest.initJdbcConnection("default") + } + } + + test("test read spark store from presto ") { + sql("show tables").show(false) + + sql("DROP TABLE IF EXISTS sample_table") + sql("CREATE TABLE sample_table (name string) STORED BY 'carbondata'") + sql("insert into sample_table select 'ajantha'") + sql("select * from sample_table ").show(200, false) + sql("describe formatted sample_table ").show(200, false) + if (System.getProperty("spark.master.url") != null) { + // supports only running through cluster + val actualResult: List[Map[String, Any]] = QueryTest.PrestoQueryTest + .executeQuery("select * from sample_table") + println("ans---------" + actualResult(0).toString()) + val expectedResult: List[Map[String, Any]] = List(Map( + "name" -> "ajantha")) + assert(actualResult.toString() equals expectedResult.toString()) + } + } + + override def afterAll { + sql("DROP TABLE IF EXISTS sample_table") + QueryTest.PrestoQueryTest.closeJdbcConnection() + } + +} \ No newline at end of file diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala index f2ae2cb..5367e0d 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala @@ -187,3 +187,19 @@ class SDVSuites4 extends Suites with BeforeAndAfterAll { println("---------------- Stopped spark -----------------") } } + +/** + * Suite class for presto tests + */ +class SDVSuites5 extends Suites with BeforeAndAfterAll { + + val suites = new PrestoSampleTestCase :: Nil + + override val nestedSuites = suites.toIndexedSeq + + override protected def afterAll() = { + println("---------------- Stopping spark -----------------") + TestQueryExecutor.INSTANCE.stop() + println("---------------- Stopped spark -----------------") + } +} \ No newline at end of file diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala index 39beae1..9d4fe79 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -18,23 +18,25 @@ package org.apache.spark.sql.common.util import java.io.{ObjectInputStream, ObjectOutputStream} -import java.util.{Locale, TimeZone} +import java.sql.{Connection, DriverManager, ResultSet} +import java.util.{Locale, Properties} -import org.apache.carbondata.common.logging.LogServiceFactory import scala.collection.JavaConversions._ +import scala.util.{Failure, Success, Try} +import com.facebook.presto.jdbc.{PrestoConnection, PrestoStatement} import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.command.LoadDataCommand import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor} -import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.scalatest.Suite -import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.util.CarbonProperties +import org.apache.commons.lang.StringUtils class QueryTest extends PlanTest with Suite { @@ -154,8 +156,6 @@ object QueryTest { } } - import java.text.DecimalFormat - /** * Runs the plan and makes sure the answer matches the expected result. * If there was exception during the execution or the contents of the DataFrame does not @@ -220,4 +220,87 @@ object QueryTest { return None } + + object PrestoQueryTest { + + var statement : PrestoStatement = _ + + def initJdbcConnection(dbName: String): Unit = { + val conn: Connection = if (System.getProperty("presto.jdbc.url") != null) { + createJdbcConnection(dbName, System.getProperty("presto.jdbc.url")) + } else { + createJdbcConnection(dbName, "localhost:8086") + } + statement = conn.createStatement().asInstanceOf[PrestoStatement] + } + + def closeJdbcConnection(): Unit = { + statement.close() + } + + /** + * execute the query by establishing the jdbc connection + * + * @param query + * @return + */ + def executeQuery(query: String): List[Map[String, Any]] = { + Try { + val result: ResultSet = statement.executeQuery(query) + convertResultSetToList(result) + } match { + case Success(result) => result + case Failure(jdbcException) => + throw jdbcException + } + } + + /** + * Creates a JDBC Client to connect CarbonData to Presto + * + * @return + */ + private def createJdbcConnection(dbName: String, url: String) = { + val JDBC_DRIVER = "com.facebook.presto.jdbc.PrestoDriver" + var DB_URL : String = null + if (StringUtils.isEmpty(dbName)) { + DB_URL = "jdbc:presto://"+ url + "/carbondata/default" + } else { + DB_URL = "jdbc:presto://" + url + "/carbondata/" + dbName + } + val properties = new Properties + // The database Credentials + properties.setProperty("user", "test") + + // STEP 2: Register JDBC driver + Class.forName(JDBC_DRIVER) + // STEP 3: Open a connection + DriverManager.getConnection(DB_URL, properties) + } + + /** + * convert result set into scala list of map + * each map represents a row + * + * @param queryResult + * @return + */ + private def convertResultSetToList(queryResult: ResultSet): List[Map[String, Any]] = { + val metadata = queryResult.getMetaData + val colNames = (1 to metadata.getColumnCount) map metadata.getColumnName + Iterator.continually(buildMapFromQueryResult(queryResult, colNames)).takeWhile(_.isDefined) + .map(_.get).toList + } + + private def buildMapFromQueryResult(queryResult: ResultSet, + colNames: Seq[String]): Option[Map[String, Any]] = { + if (queryResult.next()) { + Some(colNames.map(name => name -> queryResult.getObject(name)).toMap) + } + else { + None + } + } + } + } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala index eaef9c1..0729713 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala @@ -59,6 +59,11 @@ object Spark2TestQueryExecutor { FileFactory.getConfiguration. set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER") } + + if (System.getProperty("spark.hadoop.hive.metastore.uris") != null) { + conf.set("spark.hadoop.hive.metastore.uris", + System.getProperty("spark.hadoop.hive.metastore.uris")) + } val metaStoreDB = s"$integrationPath/spark-common-cluster-test/target" val spark = SparkSession .builder().config(conf) diff --git a/pom.xml b/pom.xml index 3f7cea6..aab2808 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,8 @@ <dev.path>${basedir}/dev</dev.path> <spark.master.url>local[2]</spark.master.url> <hdfs.url>local</hdfs.url> + <presto.jdbc.url>localhost:8086</presto.jdbc.url> + <spark.hadoop.hive.metastore.uris>thrift://localhost:8086</spark.hadoop.hive.metastore.uris> <suite.name>org.apache.carbondata.cluster.sdv.suite.SDVSuites</suite.name> <script.exetension>.sh</script.exetension> <carbon.hive.based.metastore>false</carbon.hive.based.metastore> diff --git a/processing/pom.xml b/processing/pom.xml index 8a22150..d8598bd 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -101,5 +101,13 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + </profiles> </project> diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml index 20a5991..3b56b72 100644 --- a/store/sdk/pom.xml +++ b/store/sdk/pom.xml @@ -90,4 +90,12 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + </profiles> </project> diff --git a/streaming/pom.xml b/streaming/pom.xml index 1f7c431..ffc234f 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -107,4 +107,12 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + </profiles> </project> diff --git a/tools/cli/pom.xml b/tools/cli/pom.xml index 858c0d9..7d456c4 100644 --- a/tools/cli/pom.xml +++ b/tools/cli/pom.xml @@ -88,4 +88,12 @@ </plugin> </plugins> </build> + <profiles> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + </profiles> </project>