This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 404c022  [CARBONDATA-3244] Add benchmark for Change Data Capture 
scenario
404c022 is described below

commit 404c02222be243cf3d1f225b01bd3642995f46ba
Author: Jacky Li <jacky.li...@qq.com>
AuthorDate: Fri Jan 11 00:44:58 2019 +0800

    [CARBONDATA-3244] Add benchmark for Change Data Capture scenario
    
    CDC (change data capture) is a common scenario for analyzing slowly changed 
table in data warehouse.
    It is good to add benchmark test comparing two update methods:
    1. hive_solution, which uses INSERT OVERWRITE. This is a popular method for 
hive warehouse.
    2. carbon_solution, which uses CarbonData's update syntax to update the 
history table directly.
    
    This closes #3066
---
 .../carbondata/benchmark/SCDType2Benchmark.scala   | 300 +++++++++++++++++++++
 1 file changed, 300 insertions(+)

diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SCDType2Benchmark.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SCDType2Benchmark.scala
new file mode 100644
index 0000000..a2e1b6e
--- /dev/null
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/benchmark/SCDType2Benchmark.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.benchmark
+
+import java.io.File
+import java.sql.Date
+
+import org.apache.commons.lang3.time.DateUtils
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+
+/**
+ * Benchmark for SCD (Slowly Change Dimension) Type 2 performance.
+ * See [[https://en.wikipedia.org/wiki/Slowly_changing_dimension]]
+ *
+ * The benchmark shows performance of two update methods:
+ * 1. overwrite_solution, which uses INSERT OVERWRITE. This is a popular 
method for hive warehouse.
+ * 2. update_solution, which uses CarbonData's update syntax to update the 
history table directly.
+ *
+ * When running in a 8-cores laptop, the benchmark shows:
+ *
+ * 1. test one
+ * History table 1M records, update 10K records everyday and insert 10K 
records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 13s
+ * carbon_solution: total process time takes 6s
+ *
+ *
+ * 2. test two
+ * History table 10M records, update 10K records everyday and insert 10K 
records everyday,
+ * simulated 3 days.
+ * hive_solution: total process time takes 115s
+ * carbon_solution: total process time takes 15s
+ *
+ */
+// scalastyle:off println
+object SCDType2Benchmark {
+
+  // Schema for history table
+  // Table name: dw_order_solution1 (using overwrite), dw_order_solution2 
(using update)
+  // +-------------+-----------+-------------+
+  // | Column name | Data type | Cardinality |
+  // +-------------+-----------+-------------+
+  // | order_id    | string    | 10,000,000  |
+  // +-------------+-----------+-------------+
+  // | customer_id | string    | 10,000,000  |
+  // +-------------+-----------+-------------+
+  // | start_date  | date      | NA          |
+  // +-------------+-----------+-------------+
+  // | end_date    | date      | NA          |
+  // +-------------+-----------+-------------+
+  // | state       | int       | 4           |
+  // +-------------+-----------+-------------+
+  case class Order (order_id: String, customer_id: String, start_date: Date, 
end_date: Date,
+      state: Int)
+
+  // Schema for Change table which is used for update to history table every 
day
+  // Table name: ods_order
+  // +-------------+-----------+-------------+
+  // | Column name | Data type | Cardinality |
+  // +-------------+-----------+-------------+
+  // | order_id    | string    | 10,000,000  |
+  // +-------------+-----------+-------------+
+  // | customer_id | string    | 10,000,000  |
+  // +-------------+-----------+-------------+
+  // | update_date | date      | NA          |
+  // +-------------+-----------+-------------+
+  // | state       | int       | 4           |
+  // +-------------+-----------+-------------+
+  case class Change (order_id: String, customer_id: String, update_date: Date, 
state: Int)
+
+  // number of records for first day
+  val numOrders = 10000000
+
+  // number of records to update every day
+  val numUpdateOrdersDaily = 10000
+
+  // number of new records to insert every day
+  val newNewOrdersDaily = 10000
+
+  // number of days to simulate
+  val numDays = 3
+
+  // print eveyday result or not to console
+  val printDetail = false
+
+  def generateDataForDay0(
+      sparkSession: SparkSession,
+      numOrders: Int = 1000000,
+      startDate: Date = Date.valueOf("2018-05-01")): DataFrame = {
+    import sparkSession.implicits._
+    sparkSession.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => Order(s"order$x", s"customer$x", startDate, 
Date.valueOf("9999-01-01"), 1)
+      }.toDS().toDF()
+  }
+
+  def generateDailyChange(
+      sparkSession: SparkSession,
+      numUpdatedOrders: Int,
+      startDate: Date,
+      updateDate: Date,
+      newState: Int,
+      numNewOrders: Int
+      ): DataFrame = {
+    import sparkSession.implicits._
+    // data for update to the history table
+    val ds1 = sparkSession.sparkContext.parallelize(1 to numUpdatedOrders, 4)
+      .map {x => Change(s"order$x", s"customer$x", updateDate, newState)
+      }.toDS().toDF()
+    // date for insert to the history table
+    val ds2 = sparkSession.sparkContext.parallelize(1 to numNewOrders, 4)
+      .map {x => Change(s"newOrder${System.currentTimeMillis()}", 
s"customer$x", updateDate, 1)
+      }.toDS().toDF()
+    // union them so that the Change table contains both data for update and 
insert
+    ds1.union(ds2)
+  }
+
+  def main(args: Array[String]): Unit = {
+    import org.apache.spark.sql.CarbonSession._
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+
+    val spark = SparkSession
+      .builder()
+      .master("local[8]")
+      .enableHiveSupport()
+      .config("spark.sql.warehouse.dir", 
s"$rootPath/examples/spark2/target/warehouse")
+      .getOrCreateCarbonSession()
+    spark.sparkContext.setLogLevel("error")
+
+    // prepare base data for first day
+    spark.sql("drop table if exists dw_order_solution1")
+    spark.sql("drop table if exists dw_order_solution2")
+    spark.sql("drop table if exists change")
+
+    val baseData = generateDataForDay0(
+        sparkSession = spark,
+        numOrders = numOrders,
+        startDate = Date.valueOf("2018-05-01"))
+
+    baseData.write
+      .format("carbondata")
+      .option("tableName", "dw_order_solution1")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    baseData.write
+      .format("carbondata")
+      .option("tableName", "dw_order_solution2")
+      .option("sort_columns", "order_id")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    var startDate = Date.valueOf("2018-05-01")
+    var state = 2
+    var solution1UpdateTime = 0L
+    var solution2UpdateTime = 0L
+
+    if (printDetail) {
+      println("## day0")
+      spark.sql("select * from dw_order").show(100, false)
+    }
+
+    def timeIt(func: (SparkSession) => Unit): Long = {
+      val start = System.nanoTime()
+      func(spark)
+      System.nanoTime() - start
+    }
+
+    for (i <- 1 to numDays) {
+      // prepare for incremental update data for day-i
+      val newDate = new Date(DateUtils.addDays(startDate, 1).getTime)
+      val changeData = generateDailyChange(
+        sparkSession = spark,
+        numUpdatedOrders = numUpdateOrdersDaily,
+        startDate = startDate,
+        updateDate = newDate,
+        newState = state,
+        numNewOrders = newNewOrdersDaily)
+      changeData.write
+        .format("carbondata")
+        .option("tableName", "change")
+        .mode(SaveMode.Overwrite)
+        .save()
+
+      if (printDetail) {
+        println(s"day$i Change")
+        spark.sql("select * from change").show(100, false)
+      }
+
+      // apply Change to history table by using INSERT OVERWRITE
+      solution1UpdateTime += timeIt(solution1)
+
+      // apply Change to history table by using UPDATE
+      solution2UpdateTime += timeIt(solution2)
+
+      if (printDetail) {
+        println(s"day$i result")
+        spark.sql("select * from dw_order_solution1").show(false)
+        spark.sql("select * from dw_order_solution2").show(false)
+      }
+
+      startDate = newDate
+      state = state + 1
+    }
+
+    // do a query after apply SCD Type2 update
+    val solution1QueryTime = timeIt(
+      spark => spark.sql(
+      s"""
+         | select sum(state) as sum, customer_id
+         | from dw_order_solution1
+         | group by customer_id
+         | order by sum
+         | limit 10
+         |""".stripMargin).collect()
+    )
+
+    val solution2QueryTime = timeIt(
+      spark => spark.sql(
+        s"""
+           | select sum(state) as sum, customer_id
+           | from dw_order_solution2
+           | group by customer_id
+           | order by sum
+           | limit 10
+           |""".stripMargin).collect()
+    )
+
+    // print update time
+    println(s"overwrite solution update takes ${solution1UpdateTime / 1000 / 
1000 / 1000} s")
+    println(s"update solution update takes ${solution2UpdateTime / 1000 / 1000 
/ 1000} s")
+
+    // print query time
+    println(s"overwrite solution query takes ${solution1QueryTime / 1000 / 
1000 / 1000} s")
+    println(s"update solution query takes ${solution2QueryTime / 1000 / 1000 / 
1000} s")
+
+    spark.close()
+  }
+
+  /**
+   * Typical solution when using hive
+   * This solution uses INSERT OVERWRITE to rewrite the whole table every day
+   */
+  private def solution1(spark: SparkSession) = {
+    spark.sql(
+      """
+        | insert overwrite table dw_order_solution1
+        | select * from
+        | (
+        |   select A.order_id, A.customer_id, A.start_date,
+        |     case when A.end_date > B.update_date then B.update_date
+        |     else A.end_date
+        |     end as end_date,
+        |   A.state
+        |   from dw_order_solution1 A
+        |   left join change B
+        |   on A.order_id = B.order_id
+        |   union all
+        |     select B.order_id, B.customer_id, B.update_date, 
date("9999-01-01"), B.state
+        |     from change B
+        | ) T
+      """.stripMargin)
+  }
+
+  /**
+   * Solution leveraging carbon's update syntax
+   */
+  private def solution2(spark: SparkSession) = {
+    spark.sql(
+      """
+        | update dw_order_solution2 A
+        | set (A.end_date) =
+        |   (select B.update_date
+        |   from change B
+        |   where A.order_id = B.order_id and A.end_date > B.update_date)
+      """.stripMargin).show
+    spark.sql(
+      """
+        | insert into dw_order_solution2
+        | select B.order_id, B.customer_id, B.update_date, date('9999-12-30'), 
B.state
+        | from change B
+      """.stripMargin)
+  }
+}
+// scalastyle:on println

Reply via email to