This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/asf-site by this push: new 840b07e [HUDI-589][DOCS] Fix querying_data page (#1333) 840b07e is described below commit 840b07ee4452e1f0654a32cb32cd7bed3279edcf Author: Bhavani Sudha Saktheeswaran <bhasu...@uber.com> AuthorDate: Mon Mar 2 11:09:41 2020 -0800 [HUDI-589][DOCS] Fix querying_data page (#1333) - Added support matrix for COW and MOR tables - Change reference from (`views`|`pulls`) to `queries` - And minor restructuring --- docs/_docs/2_3_querying_data.md | 131 ++++++++++++++++++++++------------------ 1 file changed, 73 insertions(+), 58 deletions(-) diff --git a/docs/_docs/2_3_querying_data.md b/docs/_docs/2_3_querying_data.md index 0ee5e17..1a2ae08 100644 --- a/docs/_docs/2_3_querying_data.md +++ b/docs/_docs/2_3_querying_data.md @@ -9,7 +9,7 @@ last_modified_at: 2019-12-30T15:59:57-04:00 Conceptually, Hudi stores data physically once on DFS, while providing 3 different ways of querying, as explained [before](/docs/concepts.html#query-types). Once the table is synced to the Hive metastore, it provides external Hive tables backed by Hudi's custom inputformats. Once the proper hudi -bundle has been provided, the table can be queried by popular query engines like Hive, Spark and Presto. +bundle has been provided, the table can be queried by popular query engines like Hive, Spark SQL, Spark datasource and Presto. Specifically, following Hive tables are registered based off [table name](/docs/configurations.html#TABLE_NAME_OPT_KEY) and [table type](/docs/configurations.html#TABLE_TYPE_OPT_KEY) passed during write. @@ -24,31 +24,49 @@ If `table name = hudi_trips` and `table type = MERGE_ON_READ`, then we get: As discussed in the concepts section, the one key primitive needed for [incrementally processing](https://www.oreilly.com/ideas/ubers-case-for-incremental-processing-on-hadoop), -is `incremental pulls` (to obtain a change stream/log from a table). Hudi tables can be pulled incrementally, which means you can get ALL and ONLY the updated & new rows -since a specified instant time. This, together with upserts, are particularly useful for building data pipelines where 1 or more source Hudi tables are incrementally pulled (streams/facts), -joined with other tables (tables/dimensions), to [write out deltas](/docs/writing_data.html) to a target Hudi table. Incremental view is realized by querying one of the tables above, +is obtaining a change stream/log from a table. Hudi tables can be queried incrementally, which means you can get ALL and ONLY the updated & new rows +since a specified instant time. This, together with upserts, is particularly useful for building data pipelines where 1 or more source Hudi tables are incrementally queried (streams/facts), +joined with other tables (tables/dimensions), to [write out deltas](/docs/writing_data.html) to a target Hudi table. Incremental queries are realized by querying one of the tables above, with special configurations that indicates to query planning that only incremental data needs to be fetched out of the table. -In sections, below we will discuss how to access these query types from different query engines. + +## SUPPORT MATRIX + +### COPY_ON_WRITE tables + +||Snapshot|Incremental|Read Optimized| +||--------|-----------|--------------| +|**Hive**|Y|Y|N/A| +|**Spark SQL**|Y|Y|N/A| +|**Spark datasource**|Y|Y|N/A| +|**Presto**|Y|N|N/A| + +### MERGE_ON_READ tables + +||Snapshot|Incremental|Read Optimized| +||--------|-----------|--------------| +|**Hive**|Y|Y|Y| +|**Spark SQL**|Y|Y|Y| +|**Spark datasource**|N|N|Y| +|**Presto**|N|N|Y| + + +In sections, below we will discuss specific setup to access different query types from different query engines. ## Hive -In order for Hive to recognize Hudi tables and query correctly, the HiveServer2 needs to be provided with the `hudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar` -in its [aux jars path](https://www.cloudera.com/documentation/enterprise/5-6-x/topics/cm_mc_hive_udf.html#concept_nc3_mms_lr). This will ensure the input format +In order for Hive to recognize Hudi tables and query correctly, + - the HiveServer2 needs to be provided with the `hudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar` in its [aux jars path](https://www.cloudera.com/documentation/enterprise/5-6-x/topics/cm_mc_hive_udf.html#concept_nc3_mms_lr). This will ensure the input format classes with its dependencies are available for query planning & execution. + - For MERGE_ON_READ tables, additionally the bundle needs to be put on the hadoop/hive installation across the cluster, so that queries can pick up the custom RecordReader as well. -### Read optimized query -In addition to setup above, for beeline cli access, the `hive.input.format` variable needs to be set to the fully qualified path name of the +In addition to setup above, for beeline cli access, the `hive.input.format` variable needs to be set to the fully qualified path name of the inputformat `org.apache.hudi.hadoop.HoodieParquetInputFormat`. For Tez, additionally the `hive.tez.input.format` needs to be set -to `org.apache.hadoop.hive.ql.io.HiveInputFormat` - -### Snapshot query -In addition to installing the hive bundle jar on the HiveServer2, it needs to be put on the hadoop/hive installation across the cluster, so that -queries can pick up the custom RecordReader as well. +to `org.apache.hadoop.hive.ql.io.HiveInputFormat`. Then proceed to query the table like any other Hive table. ### Incremental query `HiveIncrementalPuller` allows incrementally extracting changes from large fact/dimension tables via HiveQL, combining the benefits of Hive (reliably process complex SQL queries) and -incremental primitives (speed up query by pulling tables incrementally instead of scanning fully). The tool uses Hive JDBC to run the hive query and saves its results in a temp table. +incremental primitives (speed up querying tables incrementally instead of scanning fully). The tool uses Hive JDBC to run the hive query and saves its results in a temp table. that can later be upserted. Upsert utility (`HoodieDeltaStreamer`) has all the state it needs from the directory structure to know what should be the commit time on the target table. e.g: `/app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}`.The Delta Hive table registered will be of the form `{tmpdb}.{source_table}_{last_commit_included}`. @@ -63,16 +81,16 @@ The following are the configuration options for HiveIncrementalPuller |tmp| Directory where the temporary delta data is stored in DFS. The directory structure will follow conventions. Please see the below section. | | |extractSQLFile| The SQL to execute on the source table to extract the data. The data extracted will be all the rows that changed since a particular point in time. | | |sourceTable| Source Table Name. Needed to set hive environment properties. | | +|sourceDb| Source DB name. Needed to set hive environment properties.| | |targetTable| Target Table Name. Needed for the intermediate storage directory structure. | | -|sourceDataPath| Source DFS Base Path. This is where the Hudi metadata will be read. | | -|targetDataPath| Target DFS Base path. This is needed to compute the fromCommitTime. This is not needed if fromCommitTime is specified explicitly. | | +|targetDb| Target table's DB name.| | |tmpdb| The database to which the intermediate temp delta table will be created | hoodie_temp | -|fromCommitTime| This is the most important parameter. This is the point in time from which the changed records are pulled from. | | -|maxCommits| Number of commits to include in the pull. Setting this to -1 will include all the commits from fromCommitTime. Setting this to a value > 0, will include records that ONLY changed in the specified number of commits after fromCommitTime. This may be needed if you need to catch up say 2 commits at a time. | 3 | +|fromCommitTime| This is the most important parameter. This is the point in time from which the changed records are queried from. | | +|maxCommits| Number of commits to include in the query. Setting this to -1 will include all the commits from fromCommitTime. Setting this to a value > 0, will include records that ONLY changed in the specified number of commits after fromCommitTime. This may be needed if you need to catch up say 2 commits at a time. | 3 | |help| Utility Help | | -Setting fromCommitTime=0 and maxCommits=-1 will pull in the entire source table and can be used to initiate backfills. If the target table is a Hudi table, +Setting fromCommitTime=0 and maxCommits=-1 will fetch the entire source table and can be used to initiate backfills. If the target table is a Hudi table, then the utility can determine if the target table has no commits or is behind more than 24 hour (this is configurable), it will automatically use the backfill configuration, since applying the last 24 hours incrementally could take more time than doing a backfill. The current limitation of the tool is the lack of support for self-joining the same table in mixed mode (snapshot and incremental modes). @@ -84,55 +102,45 @@ using the hive session property for incremental queries: `set hive.fetch.task.co would ensure Map Reduce execution is chosen for a Hive query, which combines partitions (comma separated) and calls InputFormat.listStatus() only once with all those partitions. -## Spark - -Spark provides much easier deployment & management of Hudi jars and bundles into jobs/notebooks. At a high level, there are two ways to access Hudi tables in Spark. - - - **Hudi DataSource** : Supports Read Optimized, Incremental Pulls similar to how standard datasources (e.g: `spark.read.parquet`) work. - - **Read as Hive tables** : Supports all three query types, including the snapshot queries, relying on the custom Hudi input formats again like Hive. +## Spark SQL +Supports all query types across both Hudi table types, relying on the custom Hudi input formats again like Hive. +Typically notebook users and spark-shell users leverage spark sql for querying Hudi tables. Please add hudi-spark-bundle +as described above via --jars or --packages. - In general, your spark job needs a dependency to `hudi-spark` or `hudi-spark-bundle_2.*-x.y.z.jar` needs to be on the class path of driver & executors (hint: use `--jars` argument) - -### Read optimized query - -Pushing a path filter into sparkContext as follows allows for read optimized querying of a Hudi hive table using SparkSQL. -This method retains Spark built-in optimizations for reading Parquet files like vectorized reading on Hudi tables. - -```scala -spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]); -``` - -If you prefer to glob paths on DFS via the datasource, you can simply do something like below to get a Spark dataframe to work with. +### Snapshot query {#spark-snapshot-query} +By default, Spark SQL will try to use its own parquet reader instead of Hive SerDe when reading from Hive metastore parquet tables. +However, for MERGE_ON_READ tables which has both parquet and avro data, this default setting needs to be turned off using set `spark.sql.hive.convertMetastoreParquet=false`. +This will force Spark to fallback to using the Hive Serde to read the data (planning/executions is still Spark). ```java -Dataset<Row> hoodieROViewDF = spark.read().format("org.apache.hudi") -// pass any path glob, can include hudi & non-hudi tables -.load("/glob/path/pattern"); +$ spark-shell --driver-class-path /etc/hive/conf --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g --master yarn-client + +scala> sqlContext.sql("select count(*) from hudi_trips_mor_rt where datestr = '2016-10-02'").show() +scala> sqlContext.sql("select count(*) from hudi_trips_mor_rt where datestr = '2016-10-02'").show() ``` - -### Snapshot query {#spark-snapshot-query} -Currently, near-real time data can only be queried as a Hive table in Spark using snapshot query mode. In order to do this, set `spark.sql.hive.convertMetastoreParquet=false`, forcing Spark to fallback -to using the Hive Serde to read the data (planning/executions is still Spark). -```java -$ spark-shell --jars hudi-spark-bundle_2.11-x.y.z-SNAPSHOT.jar --driver-class-path /etc/hive/conf --packages org.apache.spark:spark-avro_2.11:2.4.4 --conf spark.sql.hive.convertMetastoreParquet=false --num-executors 10 --driver-memory 7g --executor-memory 2g --master yarn-client +For COPY_ON_WRITE tables, either Hive SerDe can be used by turning off `spark.sql.hive.convertMetastoreParquet=false` as described above or Spark's built in support can be leveraged. +If using spark's built in support, additionally a path filter needs to be pushed into sparkContext as follows. This method retains Spark built-in optimizations for reading parquet files like vectorized reading on Hudi Hive tables. -scala> sqlContext.sql("select count(*) from hudi_trips_rt where datestr = '2016-10-02'").show() -scala> sqlContext.sql("select count(*) from hudi_trips_rt where datestr = '2016-10-02'").show() +```scala +spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]); ``` -### Incremental pulling {#spark-incr-pull} -The `hudi-spark` module offers the DataSource API, a more elegant way to pull data from Hudi table and process it via Spark. -A sample incremental pull, that will obtain all records written since `beginInstantTime`, looks like below. +### Incremental querying {#spark-incr-query} +Incremental queries work like hive incremental queries. The `hudi-spark` module offers the DataSource API, a more elegant way to query data from Hudi table and process it via Spark. +A sample incremental query, that will obtain all records written since `beginInstantTime`, looks like below. ```java - Dataset<Row> hoodieIncViewDF = spark.read() + Dataset<Row> hudiIncQueryDF = spark.read() .format("org.apache.hudi") - .option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(), - DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL()) + .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), + DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL()) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), <beginInstantTime>) - .load(tablePath); // For incremental view, pass in the root/base path of table + .load(tablePath); // For incremental query, pass in the root/base path of table + +hudiIncQueryDF.createOrReplaceTempView("hudi_trips_incremental") +spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show() ``` Please refer to [configurations](/docs/configurations.html#spark-datasource) section, to view all datasource options. @@ -145,11 +153,18 @@ Additionally, `HoodieReadClient` offers the following functionality using Hudi's | filterExists() | Filter out already existing records from the provided RDD[HoodieRecord]. Useful for de-duplication | | checkExists(keys) | Check if the provided keys exist in a Hudi table | +## Spark datasource + +Hudi COPY_ON_WRITE tables can be queried via Spark datasource similar to how standard datasources work (e.g: `spark.read.parquet`). +Both snapshot querying and incremental querying are supported here. Typically spark jobs require adding `--jars <path to jar>/hudi-spark-bundle_2.11:0.5.1-incubating` +to classpath of drivers and executors. When using spark shell instead of `--jars`, `--packages` can also be used to fetch the hudi-spark-bundle like this: `--packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating` +For examples, refer to [Setup spark-shell in quickstart](/docs/quick-start-guide.html#setup-spark-shell). ## Presto -Presto is a popular query engine, providing interactive query performance. Presto currently supports only read optimized queries on Hudi tables. -This requires the `hudi-presto-bundle` jar to be placed into `<presto_install>/plugin/hive-hadoop2/`, across the installation. +Presto is a popular query engine, providing interactive query performance. Presto currently supports snapshot queries on +COPY_ON_WRITE and read optimized queries on MERGE_ON_READ Hudi tables. This requires the `hudi-presto-bundle` jar +to be placed into `<presto_install>/plugin/hive-hadoop2/`, across the installation. ## Impala(Not Officially Released)