[CARBONDATA-2098]Add Documentation for Pre-Aggregate tables Add Documentation for Pre-Aggregate tables
This closes #1886 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/71f8828b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/71f8828b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/71f8828b Branch: refs/heads/branch-1.3 Commit: 71f8828be56ae9f3927a5fc4a5047794a740c6d1 Parents: da129d5 Author: Raghunandan S <carbondatacontributi...@gmail.com> Authored: Mon Jan 29 08:54:49 2018 +0530 Committer: chenliang613 <chenliang...@huawei.com> Committed: Sat Feb 3 15:45:30 2018 +0800 ---------------------------------------------------------------------- docs/data-management-on-carbondata.md | 245 +++++++++++++++++++ .../examples/PreAggregateTableExample.scala | 145 +++++++++++ .../TimeSeriesPreAggregateTableExample.scala | 103 ++++++++ 3 files changed, 493 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/71f8828b/docs/data-management-on-carbondata.md ---------------------------------------------------------------------- diff --git a/docs/data-management-on-carbondata.md b/docs/data-management-on-carbondata.md index 3119935..0b35ed9 100644 --- a/docs/data-management-on-carbondata.md +++ b/docs/data-management-on-carbondata.md @@ -25,6 +25,7 @@ This tutorial is going to introduce all commands and data operations on CarbonDa * [UPDATE AND DELETE](#update-and-delete) * [COMPACTION](#compaction) * [PARTITION](#partition) +* [PRE-AGGREGATE TABLES](#agg-tables) * [BUCKETING](#bucketing) * [SEGMENT MANAGEMENT](#segment-management) @@ -748,6 +749,250 @@ This tutorial is going to introduce all commands and data operations on CarbonDa * The partitioned column can be excluded from SORT_COLUMNS, this will let other columns to do the efficient sorting. * When writing SQL on a partition table, try to use filters on the partition column. +## PRE-AGGREGATE TABLES + Carbondata supports pre aggregating of data so that OLAP kind of queries can fetch data + much faster.Aggregate tables are created as datamaps so that the handling is as efficient as + other indexing support.Users can create as many aggregate tables they require as datamaps to + improve their query performance,provided the storage requirements and loading speeds are + acceptable. + + For main table called **sales** which is defined as + + ``` + CREATE TABLE sales ( + order_time timestamp, + user_id string, + sex string, + country string, + quantity int, + price bigint) + STORED BY 'carbondata' + ``` + + user can create pre-aggregate tables using the DDL + + ``` + CREATE DATAMAP agg_sales + ON TABLE sales + USING "preaggregate" + AS + SELECT country, sex, sum(quantity), avg(price) + FROM sales + GROUP BY country, sex + ``` + +<b><p align="left">Functions supported in pre-aggregate tables</p></b> + +| Function | Rollup supported | +|-----------|----------------| +| SUM | Yes | +| AVG | Yes | +| MAX | Yes | +| MIN | Yes | +| COUNT | Yes | + + +##### How pre-aggregate tables are selected +For the main table **sales** and pre-aggregate table **agg_sales** created above, queries of the +kind +``` +SELECT country, sex, sum(quantity), avg(price) from sales GROUP BY country, sex + +SELECT sex, sum(quantity) from sales GROUP BY sex + +SELECT sum(price), country from sales GROUP BY country +``` + +will be transformed by Query Planner to fetch data from pre-aggregate table **agg_sales** + +But queries of kind +``` +SELECT user_id, country, sex, sum(quantity), avg(price) from sales GROUP BY country, sex + +SELECT sex, avg(quantity) from sales GROUP BY sex + +SELECT max(price), country from sales GROUP BY country +``` + +will fetch the data from the main table **sales** + +##### Loading data to pre-aggregate tables +For existing table with loaded data, data load to pre-aggregate table will be triggered by the +CREATE DATAMAP statement when user creates the pre-aggregate table. +For incremental loads after aggregates tables are created, loading data to main table triggers +the load to pre-aggregate tables once main table loading is complete.These loads are automic +meaning that data on main table and aggregate tables are only visible to the user after all tables +are loaded + +##### Querying data from pre-aggregate tables +Pre-aggregate tables cannot be queries directly.Queries are to be made on main table.Internally +carbondata will check associated pre-aggregate tables with the main table and if the +pre-aggregate tables satisfy the query condition, the plan is transformed automatically to use +pre-aggregate table to fetch the data + +##### Compacting pre-aggregate tables +Compaction is an optional operation for pre-aggregate table. If compaction is performed on main +table but not performed on pre-aggregate table, all queries still can benefit from pre-aggregate +table.To further improve performance on pre-aggregate table, compaction can be triggered on +pre-aggregate tables directly, it will merge the segments inside pre-aggregation table. +To do that, use ALTER TABLE COMPACT command on the pre-aggregate table just like the main table + + NOTE: + * If the aggregate function used in the pre-aggregate table creation included distinct-count, + during compaction, the pre-aggregate table values are recomputed.This would a costly + operation as compared to the compaction of pre-aggregate tables containing other aggregate + functions alone + +##### Update/Delete Operations on pre-aggregate tables +This functionality is not supported. + + NOTE (<b>RESTRICTION</b>): + * Update/Delete operations are <b>not supported</b> on main table which has pre-aggregate tables + created on it.All the pre-aggregate tables <b>will have to be dropped</b> before update/delete + operations can be performed on the main table.Pre-aggregate tables can be rebuilt manually + after update/delete operations are completed + +##### Delete Segment Operations on pre-aggregate tables +This functionality is not supported. + + NOTE (<b>RESTRICTION</b>): + * Delete Segment operations are <b>not supported</b> on main table which has pre-aggregate tables + created on it.All the pre-aggregate tables <b>will have to be dropped</b> before update/delete + operations can be performed on the main table.Pre-aggregate tables can be rebuilt manually + after delete segment operations are completed + +##### Alter Table Operations on pre-aggregate tables +This functionality is not supported. + + NOTE (<b>RESTRICTION</b>): + * Adding new column in new table does not have any affect on pre-aggregate tables. However if + dropping or renaming a column has impact in pre-aggregate table, such operations will be + rejected and error will be thrown.All the pre-aggregate tables <b>will have to be dropped</b> + before Alter Operations can be performed on the main table.Pre-aggregate tables can be rebuilt + manually after Alter Table operations are completed + +### Supporting timeseries data +Carbondata has built-in understanding of time hierarchy and levels: year, month, day, hour, minute. +Multiple pre-aggregate tables can be created for the hierarchy and Carbondata can do automatic +roll-up for the queries on these hierarchies. + + ``` + CREATE DATAMAP agg_year + ON TABLE sales + USING "timeseries" + DMPROPERTIES ( + 'event_timeâ=âorder_timeâ, + 'year_granualrityâ=â1â, + ) AS + SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price), + avg(price) FROM sales GROUP BY order_time, country, sex + + CREATE DATAMAP agg_month + ON TABLE sales + USING "timeseries" + DMPROPERTIES ( + 'event_timeâ=âorder_timeâ, + 'month_granualrityâ=â1â, + ) AS + SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price), + avg(price) FROM sales GROUP BY order_time, country, sex + + CREATE DATAMAP agg_day + ON TABLE sales + USING "timeseries" + DMPROPERTIES ( + 'event_timeâ=âorder_timeâ, + 'day_granualrityâ=â1â, + ) AS + SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price), + avg(price) FROM sales GROUP BY order_time, country, sex + + CREATE DATAMAP agg_sales_hour + ON TABLE sales + USING "timeseries" + DMPROPERTIES ( + 'event_timeâ=âorder_timeâ, + 'hour_granualrityâ=â1â, + ) AS + SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price), + avg(price) FROM sales GROUP BY order_time, country, sex + + CREATE DATAMAP agg_minute + ON TABLE sales + USING "timeseries" + DMPROPERTIES ( + 'event_timeâ=âorder_timeâ, + 'minute_granualrityâ=â1â, + ) AS + SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price), + avg(price) FROM sales GROUP BY order_time, country, sex + + CREATE DATAMAP agg_minute + ON TABLE sales + USING "timeseries" + DMPROPERTIES ( + 'event_timeâ=âorder_timeâ, + 'minute_granualrityâ=â1â, + ) AS + SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price), + avg(price) FROM sales GROUP BY order_time, country, sex + ``` + + For Querying data and automatically roll-up to the desired aggregation level,Carbondata supports + UDF as + ``` + timeseries(timeseries column name, âaggregation levelâ) + ``` + ``` + Select timeseries(order_time, âhourâ), sum(quantity) from sales group by timeseries(order_time, + âhourâ) + ``` + + It is **not necessary** to create pre-aggregate tables for each granularity unless required for + query + .Carbondata + can roll-up the data and fetch it + + For Example: For main table **sales** , If pre-aggregate tables were created as + + ``` + CREATE DATAMAP agg_day + ON TABLE sales + USING "timeseries" + DMPROPERTIES ( + 'event_timeâ=âorder_timeâ, + 'day_granualrityâ=â1â, + ) AS + SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price), + avg(price) FROM sales GROUP BY order_time, country, sex + + CREATE DATAMAP agg_sales_hour + ON TABLE sales + USING "timeseries" + DMPROPERTIES ( + 'event_timeâ=âorder_timeâ, + 'hour_granualrityâ=â1â, + ) AS + SELECT order_time, country, sex, sum(quantity), max(quantity), count(user_id), sum(price), + avg(price) FROM sales GROUP BY order_time, country, sex + ``` + + Queries like below will be rolled-up and fetched from pre-aggregate tables + ``` + Select timeseries(order_time, âmonthâ), sum(quantity) from sales group by timeseries(order_time, + âmonthâ) + + Select timeseries(order_time, âyearâ), sum(quantity) from sales group by timeseries(order_time, + âyearâ) + ``` + + NOTE (<b>RESTRICTION</b>): + * Only value of 1 is supported for hierarchy levels. Other hierarchy levels are not supported. + Other hierarchy levels are not supported + * pre-aggregate tables for the desired levels needs to be created one after the other + * pre-aggregate tables created for each level needs to be dropped separately + + ## BUCKETING Bucketing feature can be used to distribute/organize the table/partition data into multiple files such http://git-wip-us.apache.org/repos/asf/carbondata/blob/71f8828b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala new file mode 100644 index 0000000..fe3a93d --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/PreAggregateTableExample.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.spark.sql.SaveMode + +/** + * This example is for pre-aggregate tables. + */ + +object PreAggregateTableExample { + + def main(args: Array[String]) { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val testData = s"$rootPath/integration/spark-common-test/src/test/resources/sample.csv" + val spark = ExampleUtils.createCarbonSession("PreAggregateTableExample") + + spark.sparkContext.setLogLevel("ERROR") + + // 1. simple usage for Pre-aggregate tables creation and query + spark.sql("DROP TABLE IF EXISTS mainTable") + spark.sql(""" + | CREATE TABLE mainTable + | (id Int, + | name String, + | city String, + | age Int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + spark.sql(s""" + LOAD DATA LOCAL INPATH '$testData' into table mainTable + """) + + spark.sql( + s"""create datamap preagg_sum on table mainTable using 'preaggregate' as + | select id,sum(age) from mainTable group by id""" + .stripMargin) + spark.sql( + s"""create datamap preagg_avg on table mainTable using 'preaggregate' as + | select id,avg(age) from mainTable group by id""" + .stripMargin) + spark.sql( + s"""create datamap preagg_count on table mainTable using 'preaggregate' as + | select id,count(age) from mainTable group by id""" + .stripMargin) + spark.sql( + s"""create datamap preagg_min on table mainTable using 'preaggregate' as + | select id,min(age) from mainTable group by id""" + .stripMargin) + spark.sql( + s"""create datamap preagg_max on table mainTable using 'preaggregate' as + | select id,max(age) from mainTable group by id""" + .stripMargin) + + spark.sql( + s""" + | SELECT id,max(age) + | FROM mainTable group by id + """.stripMargin).show() + + // 2.compare the performance : with pre-aggregate VS main table + + // build test data, if set the data is larger than 100M, it will take 10+ mins. + import spark.implicits._ + + import scala.util.Random + val r = new Random() + val df = spark.sparkContext.parallelize(1 to 10 * 1000 * 1000) + .map(x => ("No." + r.nextInt(100000), "name" + x % 8, "city" + x % 50, x % 60)) + .toDF("ID", "name", "city", "age") + + // Create table with pre-aggregate table + df.write.format("carbondata") + .option("tableName", "personTable") + .option("compress", "true") + .mode(SaveMode.Overwrite).save() + + // Create table without pre-aggregate table + df.write.format("carbondata") + .option("tableName", "personTableWithoutAgg") + .option("compress", "true") + .mode(SaveMode.Overwrite).save() + + // Create pre-aggregate table + spark.sql(""" + CREATE datamap preagg_avg on table personTable using 'preaggregate' as + | select id,avg(age) from personTable group by id + """.stripMargin) + + // define time function + def time(code: => Unit): Double = { + val start = System.currentTimeMillis() + code + // return time in second + (System.currentTimeMillis() - start).toDouble / 1000 + } + + val time_without_aggTable = time { + spark.sql( + s""" + | SELECT id, avg(age) + | FROM personTableWithoutAgg group by id + """.stripMargin).count() + } + + val time_with_aggTable = time { + spark.sql( + s""" + | SELECT id, avg(age) + | FROM personTable group by id + """.stripMargin).count() + } + // scalastyle:off + println("time for query on table with pre-aggregate table:" + time_with_aggTable.toString) + println("time for query on table without pre-aggregate table:" + time_without_aggTable.toString) + // scalastyle:on + + spark.sql("DROP TABLE IF EXISTS mainTable") + spark.sql("DROP TABLE IF EXISTS personTable") + spark.sql("DROP TABLE IF EXISTS personTableWithoutAgg") + + spark.close() + + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/71f8828b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala new file mode 100644 index 0000000..470d9ff --- /dev/null +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/TimeSeriesPreAggregateTableExample.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.examples + +import java.io.File + +import org.apache.spark.sql.SaveMode + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +/** + * This example is for time series pre-aggregate tables. + */ + +object TimeSeriesPreAggregateTableExample { + + def main(args: Array[String]) { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "../../../..").getCanonicalPath + val testData = s"$rootPath/integration/spark-common-test/src/test/resources/timeseriestest.csv" + val spark = ExampleUtils.createCarbonSession("TimeSeriesPreAggregateTableExample") + + spark.sparkContext.setLogLevel("ERROR") + + import spark.implicits._ + + import scala.util.Random + val r = new Random() + val df = spark.sparkContext.parallelize(1 to 10 * 1000 ) + .map(x => ("" + 20 + "%02d".format(r.nextInt(20)) + "-" + "%02d".format(r.nextInt(11) + 1) + + "-" + "%02d".format(r.nextInt(27) + 1) + " " + "%02d".format(r.nextInt(12)) + ":" + + "%02d".format(r.nextInt(59)) + ":" + "%02d".format(r.nextInt(59)), "name" + x % 8, + r.nextInt(60))).toDF("mytime", "name", "age") + + // 1. usage for time series Pre-aggregate tables creation and query + spark.sql("drop table if exists timeSeriesTable") + spark.sql("CREATE TABLE timeSeriesTable(mytime timestamp," + + " name string, age int) STORED BY 'org.apache.carbondata.format'") + spark.sql( + s""" + | CREATE DATAMAP agg0_hour ON TABLE timeSeriesTable + | USING 'timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'HOUR_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM timeSeriesTable + | GROUP BY mytime + """.stripMargin) + spark.sql( + s""" + | CREATE DATAMAP agg0_day ON TABLE timeSeriesTable + | USING 'timeSeries' + | DMPROPERTIES ( + | 'EVENT_TIME'='mytime', + | 'DAY_GRANULARITY'='1') + | AS SELECT mytime, SUM(age) FROM timeSeriesTable + | GROUP BY mytime + """.stripMargin) + + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss") + + df.write.format("carbondata") + .option("tableName", "timeSeriesTable") + .option("compress", "true") + .mode(SaveMode.Append).save() + + spark.sql( + s""" + select sum(age), timeseries(mytime,'hour') from timeSeriesTable group by timeseries(mytime, + 'hour') + """.stripMargin).show() + + spark.sql( + s""" + select avg(age),timeseries(mytime,'year') from timeSeriesTable group by timeseries(mytime, + 'year') + """.stripMargin).show() + + spark.sql("DROP TABLE IF EXISTS timeSeriesTable") + + spark.close() + + } +}