Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1713#discussion_r178246812
  
    --- Diff: 
examples/spark2/src/main/scala/org/apache/carbondata/benchmark/ConcurrentQueryBenchmark.scala
 ---
    @@ -0,0 +1,631 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.benchmark
    +
    +import java.io.File
    +import java.text.SimpleDateFormat
    +import java.util
    +import java.util.Date
    +import java.util.concurrent.{Callable, Executors, Future, TimeUnit}
    +
    +import scala.util.Random
    +
    +import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
CarbonVersionConstants}
    +import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
    +
    +// scalastyle:off println
    +/**
    + * Test concurrent query performance of CarbonData
    + *
    + * This benchmark will print out some information:
    + * 1.Environment information
    + * 2.Parameters information
    + * 3.concurrent query performance result using parquet format
    + * 4.concurrent query performance result using CarbonData format
    + *
    + * This benchmark default run in local model,
    + * user can change 'runInLocal' to false if want to run in cluster,
    + * user can change variables like:
    + *
    + * spark-submit \
    +    --class org.apache.carbondata.benchmark.ConcurrentQueryBenchmark \
    +    --master  yarn \
    +    --deploy-mode client \
    +    --driver-memory 16g \
    +    --executor-cores 4g \
    +    --executor-memory 24g \
    +    --num-executors 3  \
    +    concurrencyTest.jar \
    +    totalNum threadNum taskNum resultIsEmpty runInLocal generateFile 
deleteFile
    + * details in initParameters method of this benchmark
    + */
    +object ConcurrentQueryBenchmark {
    +
    +  // generate number of data
    +  var totalNum = 1 * 1000 * 1000
    +  // the number of thread pool
    +  var threadNum = 16
    +  // task number of spark sql query
    +  var taskNum = 100
    +  // whether is result empty, if true then result is empty
    +  var resultIsEmpty = true
    +  // the store path of task details
    +  var path: String = "/tmp/carbondata"
    +  // whether run in local or cluster
    +  var runInLocal = true
    +  // whether generate new file
    +  var generateFile = true
    +  // whether delete file
    +  var deleteFile = true
    +
    +  val cardinalityId = 100 * 1000 * 1000
    +  val cardinalityCity = 6
    +
    +  def parquetTableName: String = "Num" + totalNum + "_" + 
"comparetest_parquet"
    +
    +  def orcTableName: String = "Num" + totalNum + "_" + "comparetest_orc"
    +
    +  def carbonTableName(version: String): String =
    +    "Num" + totalNum + "_" + s"comparetest_carbonV$version"
    +
    +  // Table schema:
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | Column name | Data type | Cardinality | Column type | Dictionary |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | id          | string    | 100,000,000 | dimension   | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | city        | string    | 6           | dimension   | yes        |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | country     | string    | 6           | dimension   | yes        |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | planet      | string    | 10,007      | dimension   | yes        |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m1          | short     | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m2          | int       | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m3          | big int   | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m4          | double    | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  // | m5          | decimal   | NA          | measure     | no         |
    +  // +-------------+-----------+-------------+-------------+------------+
    +  /**
    +   * generate DataFrame with above table schema
    +   *
    +   * @param spark SparkSession
    +   * @return Dataframe of test data
    +   */
    +  private def generateDataFrame(spark: SparkSession): DataFrame = {
    +    val rdd = spark.sparkContext
    +      .parallelize(1 to totalNum, 4)
    +      .map { x =>
    +        ((x % 100000000).toString, "city" + x % 6, "country" + x % 6, 
"planet" + x % 10007,
    +          (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
    +          BigDecimal.valueOf(x.toDouble / 11))
    +      }.map { x =>
    +      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
    +    }
    +
    +    val schema = StructType(
    +      Seq(
    +        StructField("id", StringType, nullable = false),
    +        StructField("city", StringType, nullable = false),
    +        StructField("country", StringType, nullable = false),
    +        StructField("planet", StringType, nullable = false),
    +        StructField("m1", ShortType, nullable = false),
    +        StructField("m2", IntegerType, nullable = false),
    +        StructField("m3", LongType, nullable = false),
    +        StructField("m4", DoubleType, nullable = false),
    +        StructField("m5", DecimalType(30, 10), nullable = false)
    +      )
    +    )
    +
    +    val df = spark.createDataFrame(rdd, schema)
    +    println(s"Start generate ${df.count} records, schema: ${df.schema}")
    +    df
    +  }
    +
    +  // performance test queries, they are designed to test various data 
access type
    +  val r = new Random()
    +  lazy val tmpId = r.nextInt(cardinalityId) % totalNum
    +  lazy val tmpCity = "city" + (r.nextInt(cardinalityCity) % totalNum)
    +  // different query SQL
    +  lazy val queries: Array[Query] = Array(
    +    Query(
    +      "select * from $table" + s" where id = '$tmpId' ",
    +      "filter scan",
    +      "filter on high card dimension"
    +    )
    +    , Query(
    +      "select id from $table" + s" where id = '$tmpId' ",
    +      "filter scan",
    +      "filter on high card dimension"
    +    ),
    +    Query(
    +      "select city from $table" + s" where id = '$tmpId' ",
    +      "filter scan",
    +      "filter on high card dimension"
    +    ),
    +    Query(
    +      "select * from $table" + s" where city = '$tmpCity' limit 100",
    +      "filter scan",
    +      "filter on low card dimension, medium result set, fetch all columns"
    +    ),
    +
    +    Query(
    +      "select city from $table" + s" where city = '$tmpCity' limit 100",
    +      "filter scan",
    +      "filter on low card dimension"
    +    ),
    +
    +    Query(
    +      "select id from $table" + s" where city = '$tmpCity'  limit 100",
    +      "filter scan",
    +      "filter on low card dimension"
    +    ),
    +
    +    Query(
    +      "select country, sum(m1) from $table group by country",
    +      "aggregate",
    +      "group by on big data, on medium card column, medium result set,"
    +    ),
    +
    +    Query(
    +      "select country, sum(m1) from $table" +
    +        s" where id = '$tmpId' group by country",
    +      "aggregate",
    +      "group by on big data, on medium card column, medium result set,"
    +    ),
    +
    +    Query(
    +      "select t1.country, sum(t1.m1) from $table t1 join $table t2"
    +        + s" on t1.id = t2.id where t1.id = '$tmpId' group by t1.country",
    +      "aggregate",
    +      "group by on big data, on medium card column, medium result set,"
    +    ),
    +
    +    Query(
    +      "select t2.country, sum(t2.m1) " +
    +        "from $table t1 join $table t2 join $table t3 " +
    +        "join $table t4 join $table t5 join $table t6 join $table t7 " +
    +        s"on t1.id=t2.id and t1.id=t3.id and t1.id=t4.id " +
    +        s"and t1.id=t5.id and t1.id=t6.id and " +
    +        s"t1.id=t7.id " +
    +        s" where t2.id = '$tmpId' " +
    +        s" group by t2.country",
    +      "aggregate",
    +      "group by on big data, on medium card column, medium result set,"
    +    )
    +  )
    +
    +  /**
    +   * generate parquet format table
    +   *
    +   * @param spark SparkSession
    +   * @param input DataFrame
    +   * @param table table name
    +   * @return the time of generating parquet format table
    +   */
    +  private def generateParquetTable(spark: SparkSession, input: DataFrame, 
table: String)
    +  : Double = time {
    +    // partitioned by last 1 digit of id column
    +    val dfWithPartition = input.withColumn("partitionCol", 
input.col("id").%(10))
    +    dfWithPartition.write
    +      .partitionBy("partitionCol")
    +      .mode(SaveMode.Overwrite)
    +      .parquet(table)
    +  }
    +
    +  /**
    +   * generate ORC format table
    +   *
    +   * @param spark SparkSession
    +   * @param input DataFrame
    +   * @param table table name
    +   * @return the time of generating ORC format table
    +   */
    +  private def generateOrcTable(spark: SparkSession, input: DataFrame, 
table: String): Double =
    +    time {
    +      // partitioned by last 1 digit of id column
    +      input.write
    +        .mode(SaveMode.Overwrite)
    +        .orc(table)
    +    }
    +
    +  /**
    +   * generate carbon format table
    +   *
    +   * @param spark     SparkSession
    +   * @param input     DataFrame
    +   * @param tableName table name
    +   * @return the time of generating carbon format table
    +   */
    +  private def generateCarbonTable(spark: SparkSession, input: DataFrame, 
tableName: String)
    +  : Double = {
    +    CarbonProperties.getInstance().addProperty(
    +      CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
    +      "3"
    +    )
    +    spark.sql(s"drop table if exists $tableName")
    +    time {
    +      input.write
    +        .format("carbondata")
    +        .option("tableName", tableName)
    +        .option("tempCSV", "false")
    +        .option("single_pass", "true")
    +        .option("dictionary_exclude", "id") // id is high cardinality 
column
    +        .option("table_blocksize", "32")
    +        .mode(SaveMode.Overwrite)
    +        .save()
    +    }
    +  }
    +
    +  /**
    +   * load data into parquet, carbonV2, carbonV3
    +   *
    +   * @param spark  SparkSession
    +   * @param table1 table1 name
    +   * @param table2 table2 name
    +   */
    +  def prepareTable(spark: SparkSession, table1: String, table2: String): 
Unit = {
    +    val df = if (generateFile) {
    +      generateDataFrame(spark).cache
    +    } else {
    +      null
    +    }
    +
    +    val table1Time = time {
    +      if (table1.endsWith("parquet")) {
    +        if (generateFile) {
    +          generateParquetTable(spark, df, table1)
    +        }
    +        spark.read.parquet(table1).createOrReplaceTempView(table1)
    +      } else if (table1.endsWith("orc")) {
    +        if (generateFile) {
    +          generateOrcTable(spark, df, table1)
    +          spark.read.orc(table1).createOrReplaceTempView(table1)
    +        }
    +      } else {
    +        sys.error("invalid table: " + table1)
    +      }
    +    }
    +    println(s"$table1 completed, time: $table1Time sec")
    +
    +    val table2Time: Double = if (generateFile) {
    +      generateCarbonTable(spark, df, table2)
    +    } else {
    +      0.0
    +    }
    +    println(s"$table2 completed, time: $table2Time sec")
    +    if (null != df) {
    +      df.unpersist()
    +    }
    +  }
    +
    +  /**
    +   * Run all queries for the specified table
    +   *
    +   * @param spark     SparkSession
    +   * @param tableName table name
    +   */
    +  private def runQueries(spark: SparkSession, tableName: String): Unit = {
    +    println()
    +    println(s"Start running queries for $tableName...")
    +    println(
    +      "Min: min time" +
    +        "\tMax: max time" +
    +        "\t90%: 90% time" +
    +        "\t99%: 99% time" +
    +        "\tAvg: average time" +
    +        "\tCount: number of result" +
    +        "\tQuery X: running different query sql" +
    +        "\tResult: show it when ResultIsEmpty is false" +
    +        "\tTotal execute time: total runtime")
    +    queries.zipWithIndex.map { case (query, index) =>
    +      val sqlText = query.sqlText.replace("$table", tableName)
    +
    +      val executorService = Executors.newFixedThreadPool(threadNum)
    +      val tasks = new java.util.ArrayList[Callable[Results]]()
    +      val tasksStartTime = System.nanoTime()
    +      for (num <- 1 to taskNum) {
    +        tasks.add(new QueryTask(spark, sqlText))
    +      }
    +      val results = executorService.invokeAll(tasks)
    +
    +      executorService.shutdown()
    +      executorService.awaitTermination(600, TimeUnit.SECONDS)
    +
    +      val tasksEndTime = System.nanoTime()
    +      val sql = s"Query ${index + 1}: $sqlText "
    +      printResults(results, sql, tasksStartTime)
    +      val taskTime = (tasksEndTime - tasksStartTime).toDouble / (1000 * 
1000 * 1000)
    +      println("Total execute time: " + taskTime.formatted("%.3f") + " s")
    +
    +      val timeString = new 
SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date())
    +      writeResults(spark, results, sql, tasksStartTime,
    +        path + s"/${tableName}_query${index + 1}_$timeString")
    +    }
    +  }
    +
    +  /**
    +   * save the result for subsequent  analysis
    +   *
    +   * @param spark    SparkSession
    +   * @param results  Results
    +   * @param sql      query sql
    +   * @param start    tasks start time
    +   * @param filePath write file path
    +   */
    +  def writeResults(
    +      spark: SparkSession,
    +      results: java.util.List[Future[Results]],
    +      sql: String = "",
    +      start: Long,
    +      filePath: String): Unit = {
    +    val timeArray = new Array[(Double, Double, Double)](results.size())
    +    for (i <- 0 until results.size()) {
    +      timeArray(i) =
    +        ((results.get(i).get().startTime - start) / (1000.0 * 1000),
    +          (results.get(i).get().endTime - start) / (1000.0 * 1000),
    +          (results.get(i).get().endTime - results.get(i).get().startTime) 
/ (1000.0 * 1000))
    +    }
    +    val timeArraySorted = timeArray.sortBy(x => x._1)
    +    val timeArrayString = timeArraySorted.map { e =>
    +      e._1.formatted("%.3f") + ",\t" + e._2.formatted("%.3f") + ",\t" + 
e._3.formatted("%.3f")
    +    }
    +    val saveArray = Array(sql, "startTime, endTime, runtime, measure time 
by the microsecond",
    +      s"${timeArrayString.length}")
    +      .union(timeArrayString)
    +    val rdd = spark.sparkContext.parallelize(saveArray, 1)
    +    rdd.saveAsTextFile(filePath)
    +  }
    +
    +  /**
    +   * print out results
    +   *
    +   * @param results        Results
    +   * @param sql            query sql
    +   * @param tasksStartTime tasks start time
    +   */
    +  def printResults(results: util.List[Future[Results]], sql: String = "", 
tasksStartTime: Long) {
    +    val timeArray = new Array[Double](results.size())
    +    val sqlResult = results.get(0).get().sqlResult
    +    for (i <- 0 until results.size()) {
    +      results.get(i).get()
    +    }
    +    for (i <- 0 until results.size()) {
    +      timeArray(i) = results.get(i).get().time
    +    }
    +    val sortTimeArray = timeArray.sorted
    +
    +    // the time of 90 percent sql are finished
    +    val time90 = ((sortTimeArray.length) * 0.9).toInt - 1
    +    // the time of 99 percent sql are finished
    +    val time99 = ((sortTimeArray.length) * 0.99).toInt - 1
    +    print(
    +      "Min: " + sortTimeArray.head.formatted("%.3f") + " s," +
    +        "\tMax: " + sortTimeArray.last.formatted("%.3f") + " s," +
    +        "\t90%: " + sortTimeArray(time90).formatted("%.3f") + " s," +
    +        "\t99%: " + sortTimeArray(time99).formatted("%.3f") + " s," +
    +        "\tAvg: " + (timeArray.sum / timeArray.length).formatted("%.3f") + 
" s," +
    +        "\t\tCount: " + results.get(0).get.count +
    +        "\t\t\t\t" + sql +
    +        "\t" + sqlResult.mkString(",") + "\t")
    +  }
    +
    +  /**
    +   * save result after finishing each task/thread
    +   *
    +   * @param time      each task time of executing query sql  and with 
millis time
    +   * @param sqlResult query sql result
    +   * @param count     result count
    +   * @param startTime task start time with nano time
    +   * @param endTime   task end time with nano time
    +   */
    +  case class Results(
    +      time: Double,
    +      sqlResult: Array[Row],
    +      count: Int,
    +      startTime: Long,
    +      endTime: Long)
    +
    +
    +  class QueryTask(spark: SparkSession, query: String)
    +    extends Callable[Results] with Serializable {
    +    override def call(): Results = {
    +      var result: Array[Row] = null
    +      val startTime = System.nanoTime()
    +      val rt = time {
    +        result = spark.sql(query).collect()
    --- End diff --
    
    It is better not to collect the result, it will impact gc a lot. 
    I think doing a `count` is better.


---

Reply via email to