http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala ---------------------------------------------------------------------- diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala deleted file mode 100644 index 824730f..0000000 --- a/examples/spark/src/main/scala/org/apache/carbondata/examples/PerfTest.scala +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples - -import java.io.File - -import scala.util.Random - -import org.apache.spark.sql.{CarbonContext, DataFrame, Row, SaveMode, SQLContext} -import org.apache.spark.sql.types.{DataTypes, StructType} - -import org.apache.carbondata.examples.PerfTest._ -import org.apache.carbondata.examples.util.ExampleUtils - -// scalastyle:off println - -/** - * represent one query - */ -class Query(val queryType: String, val queryNo: Int, val sqlString: String) { - - /** - * run the query in a batch and calculate average time - * - * @param sqlContext context to run the query - * @param runs run how many time - * @param datasource datasource to run - */ - def run(sqlContext: SQLContext, runs: Int, datasource: String): QueryResult = { - // run repeated and calculate average time elapsed - require(runs >= 1) - val sqlToRun = makeSQLString(datasource) - - val firstTime = withTime { - sqlContext.sql(sqlToRun).collect - } - - var totalTime: Long = 0 - var result: Array[Row] = null - (1 to (runs - 1)).foreach { x => - totalTime += withTime { - result = sqlContext.sql(sqlToRun).collect - } - } - - val avgTime = totalTime / (runs - 1) - QueryResult(datasource, result, avgTime, firstTime) - } - - private def makeSQLString(datasource: String): String = { - sqlString.replaceFirst("tableName", PerfTest.makeTableName(datasource)) - } - -} - -/** - * query performance result - */ -case class QueryResult(datasource: String, result: Array[Row], avgTime: Long, firstTime: Long) - -class QueryRunner(sqlContext: SQLContext, dataFrame: DataFrame, datasources: Seq[String]) { - - /** - * run a query on each datasource - */ - def run(query: Query, runs: Int): Seq[QueryResult] = { - var results = Seq[QueryResult]() - datasources.foreach { datasource => - val result = query.run(sqlContext, runs, datasource) - results :+= result - } - checkResult(results) - results - } - - private def checkResult(results: Seq[QueryResult]): Unit = { - results.foldLeft(results.head) { (last, cur) => - if (last.result.sortBy(_.toString()).sameElements(cur.result.sortBy(_.toString()))) cur - else sys.error(s"result is not the same between " + - s"${last.datasource} and " + - s"${cur.datasource}") - } - } - - private def loadToNative(datasource: String): Unit = { - val savePath = PerfTest.savePath(datasource) - println(s"loading data into $datasource, path: $savePath") - dataFrame.write - .mode(SaveMode.Overwrite) - .format(datasource) - .save(savePath) - sqlContext.read - .format(datasource) - .load(savePath) - .registerTempTable(PerfTest.makeTableName(datasource)) - } - - /** - * load data to each datasource - */ - def loadData: Seq[QueryResult] = { - // load data into all datasources - var results = Seq[QueryResult]() - datasources.foreach { datasource => - val time = withTime { - datasource match { - case "parquet" => - dataFrame.sqlContext.setConf(s"spark.sql.$datasource.compression.codec", "snappy") - loadToNative(datasource) - case "orc" => - dataFrame.sqlContext.sparkContext.hadoopConfiguration.set("orc.compress", "SNAPPY") - loadToNative(datasource) - case "carbon" => - sqlContext.sql(s"DROP TABLE IF EXISTS ${PerfTest.makeTableName(datasource)}") - println(s"loading data into $datasource, path: " + - s"${dataFrame.sqlContext.asInstanceOf[CarbonContext].storePath}") - dataFrame.write - .format("org.apache.spark.sql.CarbonSource") - .option("tableName", PerfTest.makeTableName(datasource)) - .mode(SaveMode.Overwrite) - .save() - case _ => sys.error("unsupported data source") - } - } - println(s"load data into $datasource completed, time taken ${time/1000000}ms") - results :+= QueryResult(datasource, null, time, time) - } - results - } - - def shutDown(): Unit = { - // drop all tables and temp files - datasources.foreach { - case datasource @ ("parquet" | "orc") => - val f = new File(PerfTest.savePath(datasource)) - if (f.exists()) f.delete() - case "carbon" => - sqlContext.sql(s"DROP TABLE IF EXISTS ${PerfTest.makeTableName("carbon")}") - case _ => sys.error("unsupported data source") - } - } -} - -/** - * template for table data generation - * - * @param dimension number of dimension columns and their cardinality - * @param measure number of measure columns - */ -case class TableTemplate(dimension: Seq[(Int, Int)], measure: Int) - -/** - * utility to generate random data according to template - */ -class TableGenerator(sqlContext: SQLContext) { - - /** - * generate a dataframe from random data - */ - def genDataFrame(template: TableTemplate, rows: Int): DataFrame = { - val measures = template.measure - val dimensions = template.dimension.foldLeft(0) {(x, y) => x + y._1} - val cardinality = template.dimension.foldLeft(Seq[Int]()) {(x, y) => - x ++ (1 to y._1).map(z => y._2) - } - print(s"generating data: $rows rows of $dimensions dimensions and $measures measures. ") - println("cardinality for each dimension: " + cardinality.mkString(", ")) - - val dimensionFields = (1 to dimensions).map { id => - DataTypes.createStructField(s"c$id", DataTypes.StringType, false) - } - val measureFields = (dimensions + 1 to dimensions + measures).map { id => - DataTypes.createStructField(s"c$id", DataTypes.IntegerType, false) - } - val schema = StructType(dimensionFields ++ measureFields) - val data = sqlContext.sparkContext.parallelize(1 to rows).map { x => - val random = new Random() - val dimSeq = (1 to dimensions).map { y => - s"P${y}_${random.nextInt(cardinality(y - 1))}" - } - val msrSeq = (1 to measures).map { y => - random.nextInt(10) - } - Row.fromSeq(dimSeq ++ msrSeq) - } - val df = sqlContext.createDataFrame(data, schema) - df.write.mode(SaveMode.Overwrite).parquet(PerfTest.savePath("temp")) - sqlContext.parquetFile(PerfTest.savePath("temp")) - } -} - -object PerfTest { - - private val olap: Seq[String] = Seq( - """SELECT c3, c4, sum(c8) FROM tableName - |WHERE c1 = 'P1_23' and c2 = 'P2_43' - |GROUP BY c3, c4""".stripMargin, - - """SELECT c2, c3, sum(c9) FROM tableName - |WHERE c1 = 'P1_432' and c4 = 'P4_3' and c5 = 'P5_2' - |GROUP by c2, c3 """.stripMargin, - - """SELECT c2, count(distinct c1), sum(c8) FROM tableName - |WHERE c3="P3_4" and c5="P5_4" - |GROUP BY c2 """.stripMargin, - - """SELECT c2, c5, count(distinct c1), sum(c7) FROM tableName - |WHERE c4="P4_4" and c5="P5_7" and c8>4 - |GROUP BY c2, c5 """.stripMargin - ) - - private val point: Seq[String] = Seq( - """SELECT c4 FROM tableName - |WHERE c1="P1_43" """.stripMargin, - - """SELECT c3 FROM tableName - |WHERE c1="P1_542" and c2="P2_23" """.stripMargin, - - """SELECT c3, c5 FROM tableName - |WHERE c1="P1_52" and c7=4""".stripMargin, - - """SELECT c4, c9 FROM tableName - |WHERE c1="P1_43" and c8<3""".stripMargin - ) - - private val filter: Seq[String] = Seq( - """SELECT * FROM tableName - |WHERE c2="P2_43" """.stripMargin, - - """SELECT * FROM tableName - |WHERE c3="P3_3" """.stripMargin, - - """SELECT * FROM tableName - |WHERE c2="P2_32" and c3="P3_23" """.stripMargin, - - """SELECT * FROM tableName - |WHERE c3="P3_28" and c4="P4_3" """.stripMargin - ) - - private val scan: Seq[String] = Seq( - """SELECT sum(c7), sum(c8), avg(c9), max(c10) FROM tableName """.stripMargin, - - """SELECT sum(c7) FROM tableName - |WHERE c2="P2_32" """.stripMargin, - - """SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM tableName - |WHERE c4="P4_4" """.stripMargin, - - """SELECT sum(c7), sum(c8), sum(9), sum(c10) FROM tableName - |WHERE c2="P2_75" and c6<5 """.stripMargin - ) - - def main(args: Array[String]) { - val cc = ExampleUtils.createCarbonContext("PerfTest") - - // prepare performance queries - var workload = Seq[Query]() - olap.zipWithIndex.foreach(x => workload :+= new Query("OLAP Query", x._2, x._1)) - point.zipWithIndex.foreach(x => workload :+= new Query("Point Query", x._2, x._1)) - filter.zipWithIndex.foreach(x => workload :+= new Query("Filter Query", x._2, x._1)) - scan.zipWithIndex.foreach(x => workload :+= new Query("Scan Query", x._2, x._1)) - - // prepare data - val rows = 3 * 1000 * 1000 - val dimension = Seq((1, 1 * 1000), (1, 100), (1, 50), (2, 10)) // cardinality for each column - val measure = 5 // number of measure - val template = TableTemplate(dimension, measure) - val df = new TableGenerator(cc).genDataFrame(template, rows) - println("generate data completed") - - // run all queries against all data sources - val datasource = Seq("parquet", "orc", "carbon") - val runner = new QueryRunner(cc, df, datasource) - - val results = runner.loadData - println(s"load performance: ${results.map(_.avgTime / 1000000L).mkString(", ")}") - - var parquetTime: Double = 0 - var orcTime: Double = 0 - var carbonTime: Double = 0 - - println(s"query id: ${datasource.mkString(", ")}, result in millisecond") - workload.foreach { query => - // run 4 times each round, will print performance of first run and avg time of last 3 runs - print(s"${query.queryType} ${query.queryNo}: ") - val results = runner.run(query, 4) - print(s"${results.map(_.avgTime / 1000000L).mkString(", ")} ") - println(s"[sql: ${query.sqlString.replace('\n', ' ')}]") - parquetTime += results(0).avgTime - orcTime += results(1).avgTime - carbonTime += results(2).avgTime - } - - println(s"Total time: ${parquetTime / 1000000}, ${orcTime / 1000000}, " + - s"${carbonTime / 1000000} = 1 : ${parquetTime / orcTime} : ${parquetTime / carbonTime}") - runner.shutDown() - } - - def makeTableName(datasource: String): String = { - s"${datasource}_perftest_table" - } - - def savePath(datasource: String): String = - s"${ExampleUtils.currentPath}/target/perftest/${datasource}" - - def withTime(body: => Unit): Long = { - val start = System.nanoTime() - body - System.nanoTime() - start - } - -} -// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala deleted file mode 100644 index 3ab61bf..0000000 --- a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples.util - -import java.io.DataOutputStream - -import scala.collection.mutable.{ArrayBuffer, HashSet} - -import org.apache.spark.SparkContext - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datastore.impl.FileFactory - -object AllDictionaryUtil { - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - def extractDictionary(sc: SparkContext, - srcData: String, - outputPath: String, - fileHeader: String, - dictCol: String): Unit = { - val fileHeaderArr = fileHeader.split(",") - val isDictCol = new Array[Boolean](fileHeaderArr.length) - for (i <- 0 until fileHeaderArr.length) { - if (dictCol.contains("|" + fileHeaderArr(i).toLowerCase() + "|")) { - isDictCol(i) = true - } else { - isDictCol(i) = false - } - } - val dictionaryRdd = sc.textFile(srcData).flatMap(x => { - val tokens = x.split(",") - val result = new ArrayBuffer[(Int, String)]() - for (i <- 0 until isDictCol.length) { - if (isDictCol(i)) { - try { - result += ((i, tokens(i))) - } catch { - case ex: ArrayIndexOutOfBoundsException => - LOGGER.error("Read a bad record: " + x) - } - } - } - result - }).groupByKey().flatMap(x => { - val distinctValues = new HashSet[(Int, String)]() - for (value <- x._2) { - distinctValues.add(x._1, value) - } - distinctValues - }) - val dictionaryValues = dictionaryRdd.map(x => x._1 + "," + x._2).collect() - saveToFile(dictionaryValues, outputPath) - } - - def cleanDictionary(outputPath: String): Unit = { - try { - val fileType = FileFactory.getFileType(outputPath) - val file = FileFactory.getCarbonFile(outputPath, fileType) - if (file.exists()) { - file.delete() - } - } catch { - case ex: Exception => - LOGGER.error("Clean dictionary catching exception:" + ex) - } - } - - def saveToFile(contents: Array[String], outputPath: String): Unit = { - var writer: DataOutputStream = null - try { - val fileType = FileFactory.getFileType(outputPath) - val file = FileFactory.getCarbonFile(outputPath, fileType) - if (!file.exists()) { - file.createNewFile() - } - writer = FileFactory.getDataOutputStream(outputPath, fileType) - for (content <- contents) { - writer.writeBytes(content + "\n") - } - } catch { - case ex: Exception => - LOGGER.error("Save dictionary to file catching exception:" + ex) - } finally { - if (writer != null) { - try { - writer.close() - } catch { - case ex: Exception => - LOGGER.error("Close output stream catching exception:" + ex) - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala ---------------------------------------------------------------------- diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala deleted file mode 100644 index f98ec3b..0000000 --- a/examples/spark/src/main/scala/org/apache/carbondata/examples/util/ExampleUtils.scala +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.examples.util - -import java.io.File - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{CarbonContext, SaveMode} - -import org.apache.carbondata.core.util.CarbonProperties - -// scalastyle:off println - -object ExampleUtils { - - def currentPath: String = new File(this.getClass.getResource("/").getPath + "../../") - .getCanonicalPath - val storeLocation = currentPath + "/target/store" - - def createCarbonContext(appName: String): CarbonContext = { - val sc = new SparkContext(new SparkConf() - .setAppName(appName) - .setMaster("local[2]")) - sc.setLogLevel("ERROR") - - println(s"Starting $appName using spark version ${sc.version}") - - val cc = new CarbonContext(sc, storeLocation, currentPath + "/target/carbonmetastore") - - CarbonProperties.getInstance() - .addProperty("carbon.storelocation", storeLocation) - cc - } - - /** - * This func will write a sample CarbonData file containing following schema: - * c1: String, c2: String, c3: Double - * Returns table path - */ - def writeSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): String = { - cc.sql(s"DROP TABLE IF EXISTS $tableName") - writeDataframe(cc, tableName, numRows, SaveMode.Overwrite) - s"$storeLocation/default/$tableName" - } - - /** - * This func will append data to the CarbonData file - * Returns table path - */ - def appendSampleCarbonFile(cc: CarbonContext, tableName: String, numRows: Int = 1000): String = { - writeDataframe(cc, tableName, numRows, SaveMode.Append) - s"$storeLocation/default/$tableName" - } - - /** - * create a new dataframe and write to CarbonData file, based on save mode - */ - private def writeDataframe( - cc: CarbonContext, tableName: String, numRows: Int, mode: SaveMode): Unit = { - // use CarbonContext to write CarbonData files - import cc.implicits._ - val sc = cc.sparkContext - val df = sc.parallelize(1 to numRows, 2) - .map(x => ("a", "b", x)) - .toDF("c1", "c2", "c3") - - // save dataframe directl to carbon file without tempCSV - df.write - .format("carbondata") - .option("tableName", tableName) - .option("compress", "true") - .option("tempCSV", "false") - .mode(mode) - .save() - } - - def cleanSampleCarbonFile(cc: CarbonContext, tableName: String): Unit = { - cc.sql(s"DROP TABLE IF EXISTS $tableName") - } -} -// scalastyle:on println - http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/examples/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml index 94af8ec..af25771 100644 --- a/examples/spark2/pom.xml +++ b/examples/spark2/pom.xml @@ -38,20 +38,6 @@ <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-spark2</artifactId> <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/hive/pom.xml ---------------------------------------------------------------------- diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml index 13bd581..e0ad499 100644 --- a/integration/hive/pom.xml +++ b/integration/hive/pom.xml @@ -67,20 +67,6 @@ <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-spark2</artifactId> <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> @@ -116,12 +102,6 @@ <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-hadoop</artifactId> <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/presto/pom.xml ---------------------------------------------------------------------- diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml index e11ce4c..13d351d 100644 --- a/integration/presto/pom.xml +++ b/integration/presto/pom.xml @@ -244,10 +244,6 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - </exclusion> </exclusions> </dependency> <dependency> @@ -371,10 +367,6 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - </exclusion> </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark-common-cluster-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/pom.xml b/integration/spark-common-cluster-test/pom.xml index 9728a5c..e529035 100644 --- a/integration/spark-common-cluster-test/pom.xml +++ b/integration/spark-common-cluster-test/pom.xml @@ -40,12 +40,6 @@ <artifactId>carbondata-spark-common</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> @@ -167,16 +161,6 @@ <artifactId>carbondata-spark</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> </profile> @@ -188,16 +172,6 @@ <artifactId>carbondata-spark</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> </profile> @@ -212,16 +186,6 @@ <artifactId>carbondata-spark2</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> </profile> http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark-common-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml index 7c67000..b2ee316 100644 --- a/integration/spark-common-test/pom.xml +++ b/integration/spark-common-test/pom.xml @@ -91,12 +91,6 @@ <artifactId>carbondata-spark-common</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> @@ -341,16 +335,6 @@ <artifactId>carbondata-spark</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> </profile> @@ -362,16 +346,6 @@ <artifactId>carbondata-spark</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> </profile> @@ -386,16 +360,6 @@ <artifactId>carbondata-spark2</artifactId> <version>${project.version}</version> <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - </exclusions> </dependency> </dependencies> </profile> http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark-common/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml index 82ff7a4..d40e213 100644 --- a/integration/spark-common/pom.xml +++ b/integration/spark-common/pom.xml @@ -38,16 +38,6 @@ <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-processing</artifactId> <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/CARBON_SPARK_INTERFACELogResource.properties ---------------------------------------------------------------------- diff --git a/integration/spark/CARBON_SPARK_INTERFACELogResource.properties b/integration/spark/CARBON_SPARK_INTERFACELogResource.properties deleted file mode 100644 index 61856cf..0000000 --- a/integration/spark/CARBON_SPARK_INTERFACELogResource.properties +++ /dev/null @@ -1,18 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -carbon.spark.interface = {0} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml deleted file mode 100644 index 5060809..0000000 --- a/integration/spark/pom.xml +++ /dev/null @@ -1,194 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-parent</artifactId> - <version>1.3.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - - <artifactId>carbondata-spark</artifactId> - <name>Apache CarbonData :: Spark</name> - - <properties> - <dev.path>${basedir}/../../dev</dev.path> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-core</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-processing</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-hadoop</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-spark-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - </dependency> - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_${scala.binary.version}</artifactId> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_${scala.binary.version}</artifactId> - <version>2.2.1</version> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <testSourceDirectory>src/test/scala</testSourceDirectory> - <resources> - <resource> - <directory>src/resources</directory> - </resource> - <resource> - <directory>.</directory> - <includes> - <include>CARBON_SPARK_INTERFACELogResource.properties</include> - </includes> - </resource> - </resources> - <plugins> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <version>2.15.2</version> - <executions> - <execution> - <id>compile</id> - <goals> - <goal>compile</goal> - </goals> - <phase>compile</phase> - </execution> - <execution> - <id>testCompile</id> - <goals> - <goal>testCompile</goal> - </goals> - <phase>test</phase> - </execution> - <execution> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-surefire-plugin</artifactId> - <version>2.18</version> - <!-- Note config is repeated in scalatest config --> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> - <systemProperties> - <java.awt.headless>true</java.awt.headless> - </systemProperties> - <failIfNoTests>false</failIfNoTests> - </configuration> - </plugin> - <plugin> - <groupId>org.scalatest</groupId> - <artifactId>scalatest-maven-plugin</artifactId> - <version>1.0</version> - <!-- Note config is repeated in surefire config --> - <configuration> - <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> - <junitxml>.</junitxml> - <filereports>CarbonTestSuite.txt</filereports> - <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m - </argLine> - <stderr /> - <environmentVariables> - </environmentVariables> - <systemProperties> - <java.awt.headless>true</java.awt.headless> - </systemProperties> - </configuration> - <executions> - <execution> - <id>test</id> - <goals> - <goal>test</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - <profiles> - <profile> - <id>sdvtest</id> - <properties> - <maven.test.skip>true</maven.test.skip> - </properties> - </profile> - </profiles> - -</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java deleted file mode 100644 index 92c8402..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark.readsupport; - -import java.io.IOException; -import java.sql.Date; -import java.sql.Timestamp; - -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.metadata.encoder.Encoding; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport; - -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.expressions.GenericRow; -import org.apache.spark.unsafe.types.UTF8String; - -public class SparkRowReadSupportImpl extends DictionaryDecodeReadSupport<Row> { - - @Override public void initialize(CarbonColumn[] carbonColumns, - AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException { - super.initialize(carbonColumns, absoluteTableIdentifier); - //can initialize and generate schema here. - } - - @Override public Row readRow(Object[] data) { - for (int i = 0; i < dictionaries.length; i++) { - if (data[i] == null) { - continue; - } - if (dictionaries[i] != null) { - data[i] = DataTypeUtil - .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKeyInBytes((int) data[i]), - (CarbonDimension) carbonColumns[i]); - if (data[i] == null) { - continue; - } - if (dataTypes[i] == DataTypes.STRING) { - data[i] = UTF8String.fromString(data[i].toString()); - } else if (dataTypes[i] == DataTypes.TIMESTAMP) { - data[i] = new Timestamp((long) data[i]); - } else if (dataTypes[i] == DataTypes.DATE) { - data[i] = new Date((long) data[i]); - } else if (dataTypes[i] == DataTypes.LONG) { - data[i] = data[i]; - } - } - else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { - //convert the long to timestamp in case of direct dictionary column - if (DataTypes.TIMESTAMP == carbonColumns[i].getDataType()) { - data[i] = new Timestamp((long) data[i] / 1000L); - } else if (DataTypes.DATE == carbonColumns[i].getDataType()) { - data[i] = new Date((long) data[i]); - } - } - } - return new GenericRow(data); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/0bf597d9/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala deleted file mode 100644 index 7881b93..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark - -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.sql._ -import org.apache.spark.sql.execution.command.LoadTable -import org.apache.spark.sql.types._ - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType} - -class CarbonDataFrameWriter(val dataFrame: DataFrame) { - - private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = { - checkContext() - val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext) - - // create a new table using dataframe's schema and write its content into the table - cc.sql(makeCreateTableString(dataFrame.schema, new CarbonOption(parameters))) - writeToCarbonFile(parameters) - } - - def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = { - // append the data as a new load - checkContext() - writeToCarbonFile(parameters) - } - - private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = { - val options = new CarbonOption(parameters) - val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext) - if (options.tempCSV) { - loadTempCSV(options, cc) - } else { - loadDataFrame(options, cc) - } - } - - /** - * Firstly, saving DataFrame to CSV files - * Secondly, load CSV files - * @param options - * @param cc - */ - private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = { - // temporary solution: write to csv file, then load the csv into carbon - val storePath = CarbonEnv.get.carbonMetastore.storePath - val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR) - .append("tempCSV") - .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName) - .append(CarbonCommonConstants.UNDERSCORE).append(options.tableName) - .append(CarbonCommonConstants.UNDERSCORE).append(System.nanoTime()).toString - writeToTempCSVFile(tempCSVFolder, options) - - val tempCSVPath = new Path(tempCSVFolder) - val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration) - - def countSize(): Double = { - var size: Double = 0 - val itor = fs.listFiles(tempCSVPath, true) - while (itor.hasNext) { - val f = itor.next() - if (f.getPath.getName.startsWith("part-")) { - size += f.getLen - } - } - size - } - - LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB") - - try { - cc.sql(makeLoadString(tempCSVFolder, options)) - } finally { - fs.delete(tempCSVPath, true) - } - } - - private def checkContext(): Unit = { - // To avoid derby problem, dataframe need to be writen and read using CarbonContext - require(dataFrame.sqlContext.isInstanceOf[CarbonContext], - "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe" - ) - } - - private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = { - - val strRDD = dataFrame.rdd.mapPartitions { case iter => - new Iterator[String] { - override def hasNext = iter.hasNext - - def convertToCSVString(seq: Seq[Any]): String = { - val build = new java.lang.StringBuilder() - if (seq.head != null) { - build.append(seq.head.toString) - } - val itemIter = seq.tail.iterator - while (itemIter.hasNext) { - build.append(CarbonCommonConstants.COMMA) - val value = itemIter.next() - if (value != null) { - build.append(value.toString) - } - } - build.toString - } - - override def next: String = { - convertToCSVString(iter.next.toSeq) - } - } - } - - if (options.compress) { - strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec]) - } else { - strRDD.saveAsTextFile(tempCSVFolder) - } - } - - /** - * Loading DataFrame directly without saving DataFrame to CSV files. - * @param options - * @param cc - */ - private def loadDataFrame(options: CarbonOption, cc: CarbonContext): Unit = { - val header = dataFrame.columns.mkString(",") - LoadTable( - Some(options.dbName), - options.tableName, - null, - Seq(), - Map("fileheader" -> header) ++ options.toMap, - isOverwriteExist = false, - null, - Some(dataFrame), - None).run(cc) - } - - private def convertToCarbonType(sparkType: DataType): String = { - sparkType match { - case StringType => CarbonType.STRING.getName - case IntegerType => CarbonType.INT.getName - case ShortType => "smallint" - case LongType => "bigint" - case FloatType => CarbonType.DOUBLE.getName - case DoubleType => CarbonType.DOUBLE.getName - case TimestampType => CarbonType.TIMESTAMP.getName - case DateType => CarbonType.DATE.getName - case decimal: DecimalType => s"${CarbonType.DECIMAL.getName} (${decimal.precision}" + - s", ${decimal.scale})" - case other => sys.error(s"unsupported type: $other") - } - } - - private def makeCreateTableString(schema: StructType, options: CarbonOption): String = { - val properties = Map( - "DICTIONARY_INCLUDE" -> options.dictionaryInclude, - "DICTIONARY_EXCLUDE" -> options.dictionaryExclude - ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",") - val carbonSchema = schema.map { field => - s"${ field.name } ${ convertToCarbonType(field.dataType) }" - } - s""" - CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName} - (${ carbonSchema.mkString(", ") }) - STORED BY '${ CarbonContext.datasourceName }' - ${ if (properties.nonEmpty) " TBLPROPERTIES (" + properties + ")" else ""} - """ - } - - private def makeLoadString(csvFolder: String, options: CarbonOption): String = { - s""" - LOAD DATA INPATH '$csvFolder' - INTO TABLE ${options.dbName}.${options.tableName} - OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}', - 'SINGLE_PASS' = '${options.singlePass}') - """ - } - - -}