nsivabalan commented on a change in pull request #2783: URL: https://github.com/apache/hudi/pull/2783#discussion_r608886318
########## File path: docs/_docs/0.8.0/1_1_spark_quick_start_guide.md ########## @@ -0,0 +1,530 @@ +--- +version: 0.8.0 +title: "Quick-Start Guide" +permalink: /docs/spark_quick-start-guide.html +toc: true +last_modified_at: 2019-12-30T15:59:57-04:00 +--- + +This guide provides a quick peek at Hudi's capabilities using spark-shell. Using Spark datasources, we will walk through +code snippets that allows you to insert and update a Hudi table of default table type: +[Copy on Write](/docs/concepts.html#copy-on-write-table). +After each write operation we will also show how to read the data both snapshot and incrementally. +# Scala example + +## Setup + +Hudi works with Spark-2.x & Spark 3.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. Review comment: fix min versions here for spark2 ########## File path: docs/_docs/0.8.0/0_3_migration_guide.md ########## @@ -0,0 +1,72 @@ +--- +version: 0.8.0 +title: Migration Guide +keywords: hudi, migration, use case +permalink: /docs/migration_guide.html +summary: In this page, we will discuss some available tools for migrating your existing table into a Hudi table +last_modified_at: 2019-12-30T15:59:57-04:00 +--- + +Hudi maintains metadata such as commit timeline and indexes to manage a table. The commit timelines helps to understand the actions happening on a table as well as the current state of a table. Indexes are used by Hudi to maintain a record key to file id mapping to efficiently locate a record. At the moment, Hudi supports writing only parquet columnar formats. +To be able to start using Hudi for your existing table, you will need to migrate your existing table into a Hudi managed table. There are a couple of ways to achieve this. + + +## Approaches + + +### Use Hudi for new partitions alone + +Hudi can be used to manage an existing table without affecting/altering the historical data already present in the +table. Hudi has been implemented to be compatible with such a mixed table with a caveat that either the complete +Hive partition is Hudi managed or not. Thus the lowest granularity at which Hudi manages a table is a Hive +partition. Start using the datasource API or the WriteClient to write to the table and make sure you start writing +to a new partition or convert your last N partitions into Hudi instead of the entire table. Note, since the historical + partitions are not managed by HUDI, none of the primitives provided by HUDI work on the data in those partitions. More concretely, one cannot perform upserts or incremental pull on such older partitions not managed by the HUDI table. +Take this approach if your table is an append only type of table and you do not expect to perform any updates to existing (or non Hudi managed) partitions. + + +### Convert existing table to Hudi + +Import your existing table into a Hudi managed table. Since all the data is Hudi managed, none of the limitations + of Approach 1 apply here. Updates spanning any partitions can be applied to this table and Hudi will efficiently + make the update available to queries. Note that not only do you get to use all Hudi primitives on this table, + there are other additional advantages of doing this. Hudi automatically manages file sizes of a Hudi managed table + . You can define the desired file size when converting this table and Hudi will ensure it writes out files + adhering to the config. It will also ensure that smaller files later get corrected by routing some new inserts into + small files rather than writing new small ones thus maintaining the health of your cluster. + +There are a few options when choosing this approach. + +**Option 1** Review comment: shouldn't we also briefly talk about bootstrap here as one of the options? ########## File path: docs/_docs/0.8.0/1_1_spark_quick_start_guide.md ########## @@ -0,0 +1,530 @@ +--- +version: 0.8.0 +title: "Quick-Start Guide" +permalink: /docs/spark_quick-start-guide.html +toc: true +last_modified_at: 2019-12-30T15:59:57-04:00 +--- + +This guide provides a quick peek at Hudi's capabilities using spark-shell. Using Spark datasources, we will walk through +code snippets that allows you to insert and update a Hudi table of default table type: +[Copy on Write](/docs/concepts.html#copy-on-write-table). +After each write operation we will also show how to read the data both snapshot and incrementally. +# Scala example + +## Setup + +Hudi works with Spark-2.x & Spark 3.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. +From the extracted directory run spark-shell with Hudi as: + +```scala +// spark-shell +spark-shell \ + --packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1 \ + --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' +``` + +<div class="notice--info"> + <h4>Please note the following: </h4> +<ul> + <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li> + <li>spark-avro and spark versions must match (we have used 3.0.1 for both above)</li> + <li>we have used hudi-spark-bundle built for scala 2.12 since the spark-avro module used also depends on 2.12. + If spark-avro_2.11 is used, correspondingly hudi-spark-bundle_2.11 needs to be used. </li> Review comment: add bundle option for spark3 ########## File path: docs/_docs/0.8.0/2_1_overview.md ########## @@ -0,0 +1,174 @@ +--- +version: 0.8.0 +title: "Overview" +keywords: hudi, design, table, queries, timeline +permalink: /docs/overview.html +summary: "Here we introduce some basic concepts & give a broad technical overview of Hudi" +toc: true +last_modified_at: 2019-12-30T15:59:57-04:00 +--- + +Apache Hudi (pronounced “hoodie”) provides streaming primitives over hadoop compatible storages + + * Update/Delete Records (how do I change records in a table?) + * Change Streams (how do I fetch records that changed?) + +In this section, we will discuss key concepts & terminologies that are important to understand, to be able to effectively use these primitives. + +## Timeline +At its core, Hudi maintains a `timeline` of all actions performed on the table at different `instants` of time that helps provide instantaneous views of the table, +while also efficiently supporting retrieval of data in the order of arrival. A Hudi instant consists of the following components + + * `Instant action` : Type of action performed on the table + * `Instant time` : Instant time is typically a timestamp (e.g: 20190117010349), which monotonically increases in the order of action's begin time. + * `state` : current state of the instant + +Hudi guarantees that the actions performed on the timeline are atomic & timeline consistent based on the instant time. + +Key actions performed include + + * `COMMITS` - A commit denotes an **atomic write** of a batch of records into a table. + * `CLEANS` - Background activity that gets rid of older versions of files in the table, that are no longer needed. + * `DELTA_COMMIT` - A delta commit refers to an **atomic write** of a batch of records into a MergeOnRead type table, where some/all of the data could be just written to delta logs. + * `COMPACTION` - Background activity to reconcile differential data structures within Hudi e.g: moving updates from row based log files to columnar formats. Internally, compaction manifests as a special commit on the timeline + * `ROLLBACK` - Indicates that a commit/delta commit was unsuccessful & rolled back, removing any partial files produced during such a write Review comment: @n3nash @satishkotha : not required in this patch. but do we need to revisit this section and talk about cluster, insert_overwrite etc? ########## File path: docs/_docs/0.8.0/1_1_spark_quick_start_guide.md ########## @@ -0,0 +1,530 @@ +--- +version: 0.8.0 +title: "Quick-Start Guide" +permalink: /docs/spark_quick-start-guide.html +toc: true +last_modified_at: 2019-12-30T15:59:57-04:00 +--- + +This guide provides a quick peek at Hudi's capabilities using spark-shell. Using Spark datasources, we will walk through +code snippets that allows you to insert and update a Hudi table of default table type: +[Copy on Write](/docs/concepts.html#copy-on-write-table). +After each write operation we will also show how to read the data both snapshot and incrementally. +# Scala example + +## Setup + +Hudi works with Spark-2.x & Spark 3.x versions. You can follow instructions [here](https://spark.apache.org/downloads.html) for setting up spark. +From the extracted directory run spark-shell with Hudi as: + +```scala +// spark-shell +spark-shell \ + --packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1 \ + --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' +``` + +<div class="notice--info"> + <h4>Please note the following: </h4> +<ul> + <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li> + <li>spark-avro and spark versions must match (we have used 3.0.1 for both above)</li> + <li>we have used hudi-spark-bundle built for scala 2.12 since the spark-avro module used also depends on 2.12. + If spark-avro_2.11 is used, correspondingly hudi-spark-bundle_2.11 needs to be used. </li> +</ul> +</div> + +Setup table name, base path and a data generator to generate records for this guide. + +```scala +// spark-shell +import org.apache.hudi.QuickstartUtils._ +import scala.collection.JavaConversions._ +import org.apache.spark.sql.SaveMode._ +import org.apache.hudi.DataSourceReadOptions._ +import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.config.HoodieWriteConfig._ + +val tableName = "hudi_trips_cow" +val basePath = "file:///tmp/hudi_trips_cow" +val dataGen = new DataGenerator +``` + +The [DataGenerator](https://github.com/apache/hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L50) +can generate sample inserts and updates based on the the sample trip schema [here](https://github.com/apache/hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L57) +{: .notice--info} + + +## Insert data + +Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below. + +```scala +// spark-shell +val inserts = convertToStringList(dataGen.generateInserts(10)) +val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) +df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Overwrite). + save(basePath) +``` + +`mode(Overwrite)` overwrites and recreates the table if it already exists. +You can check the data generated under `/tmp/hudi_trips_cow/<region>/<country>/<city>/`. We provided a record key +(`uuid` in [schema](https://github.com/apache/hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)), partition field (`region/country/city`) and combine logic (`ts` in +[schema](https://github.com/apache/hudi/blob/master/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java#L58)) to ensure trip records are unique within each partition. For more info, refer to +[Modeling data stored in Hudi](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=113709185#FAQ-HowdoImodelthedatastoredinHudi) +and for info on ways to ingest data into Hudi, refer to [Writing Hudi Tables](/docs/writing_data.html). +Here we are using the default write operation : `upsert`. If you have a workload without updates, you can also issue +`insert` or `bulk_insert` operations which could be faster. To know more, refer to [Write operations](/docs/writing_data#write-operations) +{: .notice--info} + +## Query data + +Load the data files into a DataFrame. + +```scala +// spark-shell +val tripsSnapshotDF = spark. + read. + format("hudi"). + load(basePath + "/*/*/*/*") +//load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery +tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot") + +spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show() +spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from hudi_trips_snapshot").show() +``` + +This query provides snapshot querying of the ingested data. Since our partition path (`region/country/city`) is 3 levels nested +from base path we ve used `load(basePath + "/*/*/*/*")`. +Refer to [Table types and queries](/docs/concepts#table-types--queries) for more info on all table types and query types supported. +{: .notice--info} + +## Update data + +This is similar to inserting new data. Generate updates to existing trips using the data generator, load into a DataFrame +and write DataFrame into the hudi table. + +```scala +// spark-shell +val updates = convertToStringList(dataGen.generateUpdates(10)) +val df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) +df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Append). + save(basePath) +``` + +Notice that the save mode is now `Append`. In general, always use append mode unless you are trying to create the table for the first time. +[Querying](#query-data) the data again will now show updated trips. Each write operation generates a new [commit](/docs/concepts.html) +denoted by the timestamp. Look for changes in `_hoodie_commit_time`, `rider`, `driver` fields for the same `_hoodie_record_key`s in previous commit. +{: .notice--info} + +## Incremental query + +Hudi also provides capability to obtain a stream of records that changed since given commit timestamp. +This can be achieved using Hudi's incremental querying and providing a begin time from which changes need to be streamed. +We do not need to specify endTime, if we want all changes after the given commit (as is the common case). + +```scala +// spark-shell +// reload data +spark. + read. + format("hudi"). + load(basePath + "/*/*/*/*"). + createOrReplaceTempView("hudi_trips_snapshot") + +val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50) +val beginTime = commits(commits.length - 2) // commit time we are interested in + +// incrementally query data +val tripsIncrementalDF = spark.read.format("hudi"). + option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). + option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). + load(basePath) +tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental") + +spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_incremental where fare > 20.0").show() +``` + +This will give all changes that happened after the beginTime commit with the filter of fare > 20.0. The unique thing about this +feature is that it now lets you author streaming pipelines on batch data. +{: .notice--info} + +## Point in time query + +Lets look at how to query data as of a specific time. The specific time can be represented by pointing endTime to a +specific commit time and beginTime to "000" (denoting earliest possible commit time). + +```scala +// spark-shell +val beginTime = "000" // Represents all commits > this time. +val endTime = commits(commits.length - 2) // commit time we are interested in + +//incrementally query data +val tripsPointInTimeDF = spark.read.format("hudi"). + option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL). + option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). + option(END_INSTANTTIME_OPT_KEY, endTime). + load(basePath) +tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time") +spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show() +``` + +## Delete data {#deletes} +Delete records for the HoodieKeys passed in. + +```scala +// spark-shell +// fetch total records count +spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() +// fetch two records to be deleted +val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2) + +// issue deletes +val deletes = dataGen.generateDeletes(ds.collectAsList()) +val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2)) + +df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(OPERATION_OPT_KEY,"delete"). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Append). + save(basePath) + +// run the same read query as above. +val roAfterDeleteViewDF = spark. + read. + format("hudi"). + load(basePath + "/*/*/*/*") + +roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot") +// fetch should return (total - 2) records +spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count() +``` +Note: Only `Append` mode is supported for delete operation. + +See the [deletion section](/docs/writing_data.html#deletes) of the writing data page for more details. + +## Insert Overwrite Table + +Generate some new trips, overwrite the table logically at the Hudi metadata level. The Hudi cleaner will eventually +clean up the previous table snapshot's file groups. This can be faster than deleting the older table and recreating +in `Overwrite` mode. + +```scala +// spark-shell +spark. + read.format("hudi"). + load(basePath + "/*/*/*/*"). + select("uuid","partitionpath"). + show(10, false) + +val inserts = convertToStringList(dataGen.generateInserts(10)) +val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) +df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(OPERATION_OPT_KEY,"insert_overwrite_table"). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Append). + save(basePath) + +// Should have different keys now, from query before. +spark. + read.format("hudi"). + load(basePath + "/*/*/*/*"). + select("uuid","partitionpath"). + show(10, false) + +``` + +## Insert Overwrite + +Generate some new trips, overwrite the all the partitions that are present in the input. This operation can be faster +than `upsert` for batch ETL jobs, that are recomputing entire target partitions at once (as opposed to incrementally +updating the target tables). This is because, we are able to bypass indexing, precombining and other repartitioning +steps in the upsert write path completely. + +```scala +// spark-shell +spark. + read.format("hudi"). + load(basePath + "/*/*/*/*"). + select("uuid","partitionpath"). + sort("partitionpath","uuid"). + show(100, false) + +val inserts = convertToStringList(dataGen.generateInserts(10)) +val df = spark. + read.json(spark.sparkContext.parallelize(inserts, 2)). + filter("partitionpath = 'americas/united_states/san_francisco'") +df.write.format("hudi"). + options(getQuickstartWriteConfigs). + option(OPERATION_OPT_KEY,"insert_overwrite"). + option(PRECOMBINE_FIELD_OPT_KEY, "ts"). + option(RECORDKEY_FIELD_OPT_KEY, "uuid"). + option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath"). + option(TABLE_NAME, tableName). + mode(Append). + save(basePath) + +// Should have different keys now for San Francisco alone, from query before. +spark. + read.format("hudi"). + load(basePath + "/*/*/*/*"). + select("uuid","partitionpath"). + sort("partitionpath","uuid"). + show(100, false) +``` + +# Pyspark example +## Setup + +Examples below illustrate the same flow above, instead using PySpark. + +```python +# pyspark +export PYSPARK_PYTHON=$(which python3) +pyspark \ + --packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1 \ + --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' +``` + +<div class="notice--info"> + <h4>Please note the following: </h4> +<ul> + <li>spark-avro module needs to be specified in --packages as it is not included with spark-shell by default</li> + <li>spark-avro and spark versions must match (we have used 3.0.1 for both above)</li> + <li>we have used hudi-spark-bundle built for scala 2.12 since the spark-avro module used also depends on 2.12. + If spark-avro_2.11 is used, correspondingly hudi-spark-bundle_2.11 needs to be used. </li> Review comment: do remind here about spark3 bundle ########## File path: docs/_docs/0.8.0/0_3_migration_guide.md ########## @@ -0,0 +1,72 @@ +--- +version: 0.8.0 +title: Migration Guide +keywords: hudi, migration, use case +permalink: /docs/migration_guide.html +summary: In this page, we will discuss some available tools for migrating your existing table into a Hudi table +last_modified_at: 2019-12-30T15:59:57-04:00 +--- + +Hudi maintains metadata such as commit timeline and indexes to manage a table. The commit timelines helps to understand the actions happening on a table as well as the current state of a table. Indexes are used by Hudi to maintain a record key to file id mapping to efficiently locate a record. At the moment, Hudi supports writing only parquet columnar formats. Review comment: is this something new you add w/ 0.8.0 or is this a copy of already existing content? this is definitely useful. just wanted to see if I have to do line by line reviews ? ########## File path: docs/_docs/0.8.0/2_1_overview.md ########## @@ -0,0 +1,174 @@ +--- +version: 0.8.0 +title: "Overview" +keywords: hudi, design, table, queries, timeline +permalink: /docs/overview.html +summary: "Here we introduce some basic concepts & give a broad technical overview of Hudi" +toc: true +last_modified_at: 2019-12-30T15:59:57-04:00 +--- + +Apache Hudi (pronounced “hoodie”) provides streaming primitives over hadoop compatible storages + + * Update/Delete Records (how do I change records in a table?) + * Change Streams (how do I fetch records that changed?) + +In this section, we will discuss key concepts & terminologies that are important to understand, to be able to effectively use these primitives. + +## Timeline +At its core, Hudi maintains a `timeline` of all actions performed on the table at different `instants` of time that helps provide instantaneous views of the table, +while also efficiently supporting retrieval of data in the order of arrival. A Hudi instant consists of the following components + + * `Instant action` : Type of action performed on the table + * `Instant time` : Instant time is typically a timestamp (e.g: 20190117010349), which monotonically increases in the order of action's begin time. + * `state` : current state of the instant + +Hudi guarantees that the actions performed on the timeline are atomic & timeline consistent based on the instant time. + +Key actions performed include + + * `COMMITS` - A commit denotes an **atomic write** of a batch of records into a table. + * `CLEANS` - Background activity that gets rid of older versions of files in the table, that are no longer needed. + * `DELTA_COMMIT` - A delta commit refers to an **atomic write** of a batch of records into a MergeOnRead type table, where some/all of the data could be just written to delta logs. + * `COMPACTION` - Background activity to reconcile differential data structures within Hudi e.g: moving updates from row based log files to columnar formats. Internally, compaction manifests as a special commit on the timeline + * `ROLLBACK` - Indicates that a commit/delta commit was unsuccessful & rolled back, removing any partial files produced during such a write + * `SAVEPOINT` - Marks certain file groups as "saved", such that cleaner will not delete them. It helps restore the table to a point on the timeline, in case of disaster/data recovery scenarios. + +Any given instant can be +in one of the following states + + * `REQUESTED` - Denotes an action has been scheduled, but has not initiated Review comment: same here. if there are any new additions here @n3nash ########## File path: docs/_docs/0.8.0/0_3_migration_guide.md ########## @@ -0,0 +1,72 @@ +--- +version: 0.8.0 +title: Migration Guide +keywords: hudi, migration, use case +permalink: /docs/migration_guide.html +summary: In this page, we will discuss some available tools for migrating your existing table into a Hudi table +last_modified_at: 2019-12-30T15:59:57-04:00 +--- + +Hudi maintains metadata such as commit timeline and indexes to manage a table. The commit timelines helps to understand the actions happening on a table as well as the current state of a table. Indexes are used by Hudi to maintain a record key to file id mapping to efficiently locate a record. At the moment, Hudi supports writing only parquet columnar formats. Review comment: May I know from where this page is linked? https://hudi.apache.org/docs/migration_guide.html in other words, how to navigate to this page from our home page. ########## File path: docs/_docs/0.8.0/2_4_configurations.md ########## @@ -0,0 +1,890 @@ +--- +version: 0.8.0 +title: Configurations +keywords: garbage collection, hudi, jvm, configs, tuning +permalink: /docs/configurations.html +summary: "Here we list all possible configurations and what they mean" +toc: true +last_modified_at: 2019-12-30T15:59:57-04:00 +--- + +This page covers the different ways of configuring your job to write/read Hudi tables. +At a high level, you can control behaviour at few levels. + +- **[Spark Datasource Configs](#spark-datasource)** : These configs control the Hudi Spark Datasource, providing ability to define keys/partitioning, pick out the write operation, specify how to merge records or choosing query type to read. +- **[Flink SQL Configs](#flink-options)** : These configs control the Hudi Flink SQL source/sink connectors, providing ability to define record keys, pick out the write operation, specify how to merge records, enable/disable asynchronous compaction or choosing query type to read. +- **[WriteClient Configs](#writeclient-configs)** : Internally, the Hudi datasource uses a RDD based `HoodieWriteClient` api to actually perform writes to storage. These configs provide deep control over lower level aspects like + file sizing, compression, parallelism, compaction, write schema, cleaning etc. Although Hudi provides sane defaults, from time-time these configs may need to be tweaked to optimize for specific workloads. +- **[RecordPayload Config](#PAYLOAD_CLASS_OPT_KEY)** : This is the lowest level of customization offered by Hudi. Record payloads define how to produce new values to upsert based on incoming new record and + stored old record. Hudi provides default implementations such as `OverwriteWithLatestAvroPayload` which simply update table with the latest/last-written record. + This can be overridden to a custom class extending `HoodieRecordPayload` class, on both datasource and WriteClient levels. + + +## Spark Datasource Configs {#spark-datasource} + +Spark jobs using the datasource can be configured by passing the below options into the `option(k,v)` method as usual. +The actual datasource level configs are listed below. + + +### Write Options + +Additionally, you can pass down any of the WriteClient level configs directly using `options()` or `option(k,v)` methods. + +```java +inputDF.write() +.format("org.apache.hudi") +.options(clientOpts) // any of the Hudi client opts can be passed in as well +.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key") +.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition") +.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp") +.option(HoodieWriteConfig.TABLE_NAME, tableName) +.mode(SaveMode.Append) +.save(basePath); +``` + +Options useful for writing tables via `write.format.option(...)` + +#### TABLE_NAME_OPT_KEY {#TABLE_NAME_OPT_KEY} + Property: `hoodie.datasource.write.table.name` [Required]<br/> + <span style="color:grey">Hive table name, to register the table into.</span> + +#### OPERATION_OPT_KEY {#OPERATION_OPT_KEY} + Property: `hoodie.datasource.write.operation`, Default: `upsert`<br/> + <span style="color:grey">whether to do upsert, insert or bulkinsert for the write operation. Use `bulkinsert` to load new data into a table, and there on use `upsert`/`insert`. + bulk insert uses a disk based write path to scale to load large inputs without need to cache it.</span> + +#### TABLE_TYPE_OPT_KEY {#TABLE_TYPE_OPT_KEY} + Property: `hoodie.datasource.write.table.type`, Default: `COPY_ON_WRITE` <br/> + <span style="color:grey">The table type for the underlying data, for this write. This can't change between writes.</span> + +#### PRECOMBINE_FIELD_OPT_KEY {#PRECOMBINE_FIELD_OPT_KEY} + Property: `hoodie.datasource.write.precombine.field`, Default: `ts` <br/> + <span style="color:grey">Field used in preCombining before actual write. When two records have the same key value, +we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..)</span> + +#### PAYLOAD_CLASS_OPT_KEY {#PAYLOAD_CLASS_OPT_KEY} + Property: `hoodie.datasource.write.payload.class`, Default: `org.apache.hudi.OverwriteWithLatestAvroPayload` <br/> + <span style="color:grey">Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. + This will render any value set for `PRECOMBINE_FIELD_OPT_VAL` in-effective</span> + +#### RECORDKEY_FIELD_OPT_KEY {#RECORDKEY_FIELD_OPT_KEY} + Property: `hoodie.datasource.write.recordkey.field`, Default: `uuid` <br/> + <span style="color:grey">Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value +will be obtained by invoking .toString() on the field value. Nested fields can be specified using +the dot notation eg: `a.b.c`</span> + +#### PARTITIONPATH_FIELD_OPT_KEY {#PARTITIONPATH_FIELD_OPT_KEY} + Property: `hoodie.datasource.write.partitionpath.field`, Default: `partitionpath` <br/> + <span style="color:grey">Partition path field. Value to be used at the `partitionPath` component of `HoodieKey`. +Actual value ontained by invoking .toString()</span> + +#### HIVE_STYLE_PARTITIONING_OPT_KEY {#HIVE_STYLE_PARTITIONING_OPT_KEY} + Property: `hoodie.datasource.write.hive_style_partitioning`, Default: `false` <br/> + <span style="color:grey">When set to true, partition folder names follow the format of Hive partitions: <partition_column_name>=<partition_value></span> + +#### KEYGENERATOR_CLASS_OPT_KEY {#KEYGENERATOR_CLASS_OPT_KEY} + Property: `hoodie.datasource.write.keygenerator.class`, Default: `org.apache.hudi.keygen.SimpleKeyGenerator` <br/> + <span style="color:grey">Key generator class, that implements will extract the key out of incoming `Row` object</span> + +#### COMMIT_METADATA_KEYPREFIX_OPT_KEY {#COMMIT_METADATA_KEYPREFIX_OPT_KEY} + Property: `hoodie.datasource.write.commitmeta.key.prefix`, Default: `_` <br/> + <span style="color:grey">Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. +This is useful to store checkpointing information, in a consistent way with the hudi timeline</span> + +#### INSERT_DROP_DUPS_OPT_KEY {#INSERT_DROP_DUPS_OPT_KEY} + Property: `hoodie.datasource.write.insert.drop.duplicates`, Default: `false` <br/> + <span style="color:grey">If set to true, filters out all duplicate records from incoming dataframe, during insert operations. </span> + +#### HIVE_SYNC_ENABLED_OPT_KEY {#HIVE_SYNC_ENABLED_OPT_KEY} + Property: `hoodie.datasource.hive_sync.enable`, Default: `false` <br/> + <span style="color:grey">When set to true, register/sync the table to Apache Hive metastore</span> + +#### HIVE_DATABASE_OPT_KEY {#HIVE_DATABASE_OPT_KEY} + Property: `hoodie.datasource.hive_sync.database`, Default: `default` <br/> + <span style="color:grey">database to sync to</span> + +#### HIVE_TABLE_OPT_KEY {#HIVE_TABLE_OPT_KEY} + Property: `hoodie.datasource.hive_sync.table`, [Required] <br/> + <span style="color:grey">table to sync to</span> + +#### HIVE_USER_OPT_KEY {#HIVE_USER_OPT_KEY} + Property: `hoodie.datasource.hive_sync.username`, Default: `hive` <br/> + <span style="color:grey">hive user name to use</span> + +#### HIVE_PASS_OPT_KEY {#HIVE_PASS_OPT_KEY} + Property: `hoodie.datasource.hive_sync.password`, Default: `hive` <br/> + <span style="color:grey">hive password to use</span> + +#### HIVE_URL_OPT_KEY {#HIVE_URL_OPT_KEY} + Property: `hoodie.datasource.hive_sync.jdbcurl`, Default: `jdbc:hive2://localhost:10000` <br/> + <span style="color:grey">Hive metastore url</span> + +#### HIVE_PARTITION_FIELDS_OPT_KEY {#HIVE_PARTITION_FIELDS_OPT_KEY} + Property: `hoodie.datasource.hive_sync.partition_fields`, Default: ` ` <br/> + <span style="color:grey">field in the table to use for determining hive partition columns.</span> + +#### HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY {#HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY} + Property: `hoodie.datasource.hive_sync.partition_extractor_class`, Default: `org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor` <br/> + <span style="color:grey">Class used to extract partition field values into hive partition columns.</span> + +#### HIVE_ASSUME_DATE_PARTITION_OPT_KEY {#HIVE_ASSUME_DATE_PARTITION_OPT_KEY} + Property: `hoodie.datasource.hive_sync.assume_date_partitioning`, Default: `false` <br/> + <span style="color:grey">Assume partitioning is yyyy/mm/dd</span> + +#### HIVE_USE_JDBC_OPT_KEY {#HIVE_USE_JDBC_OPT_KEY} + Property: `hoodie.datasource.hive_sync.use_jdbc`, Default: `true` <br/> + <span style="color:grey">Use JDBC when hive synchronization is enabled</span> + +#### HIVE_AUTO_CREATE_DATABASE_OPT_KEY {#HIVE_AUTO_CREATE_DATABASE_OPT_KEY} +Property: `hoodie.datasource.hive_sync.auto_create_database` Default: `true` <br/> +<span style="color:grey"> Auto create hive database if does not exists </span> + +#### HIVE_SKIP_RO_SUFFIX {#HIVE_SKIP_RO_SUFFIX} +Property: `hoodie.datasource.hive_sync.skip_ro_suffix` Default: `false` <br/> +<span style="color:grey"> Skip the `_ro` suffix for Read optimized table, when registering</span> + +#### HIVE_SUPPORT_TIMESTAMP {#HIVE_SUPPORT_TIMESTAMP} +Property: `hoodie.datasource.hive_sync.support_timestamp` Default: `false` <br/> +<span style="color:grey"> 'INT64' with original type TIMESTAMP_MICROS is converted to hive 'timestamp' type. Disabled by default for backward compatibility. </span> + +### Read Options + +Options useful for reading tables via `read.format.option(...)` + +#### QUERY_TYPE_OPT_KEY {#QUERY_TYPE_OPT_KEY} +Property: `hoodie.datasource.query.type`, Default: `snapshot` <br/> +<span style="color:grey">Whether data needs to be read, in incremental mode (new data since an instantTime) +(or) Read Optimized mode (obtain latest view, based on columnar data) +(or) Snapshot mode (obtain latest view, based on row & columnar data)</span> + +#### BEGIN_INSTANTTIME_OPT_KEY {#BEGIN_INSTANTTIME_OPT_KEY} +Property: `hoodie.datasource.read.begin.instanttime`, [Required in incremental mode] <br/> +<span style="color:grey">Instant time to start incrementally pulling data from. The instanttime here need not +necessarily correspond to an instant on the timeline. New data written with an + `instant_time > BEGIN_INSTANTTIME` are fetched out. For e.g: '20170901080000' will get + all new data written after Sep 1, 2017 08:00AM.</span> + +#### END_INSTANTTIME_OPT_KEY {#END_INSTANTTIME_OPT_KEY} +Property: `hoodie.datasource.read.end.instanttime`, Default: latest instant (i.e fetches all new data since begin instant time) <br/> +<span style="color:grey"> Instant time to limit incrementally fetched data to. New data written with an +`instant_time <= END_INSTANTTIME` are fetched out.</span> + +#### INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY {#INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME_OPT_KEY} +Property: `hoodie.datasource.read.schema.use.end.instanttime`, Default: false <br/> +<span style="color:grey"> Uses end instant schema when incrementally fetched data to. Default: users latest instant schema. </span> + +## Flink SQL Config Options {#flink-options} + +Flink jobs using the SQL can be configured through the options in `WITH` clause. +The actual datasource level configs are listed below. + +### Write Options + +| Option Name | Required | Default | Remarks | +| ----------- | ------- | ------- | ------- | +| `path` | Y | N/A | <span style="color:grey"> Base path for the target hoodie table. The path would be created if it does not exist, otherwise a hudi table expects to be initialized successfully </span> | +| `table.type` | N | COPY_ON_WRITE | <span style="color:grey"> Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ </span> | +| `write.operation` | N | upsert | <span style="color:grey"> The write operation, that this write should do (insert or upsert is supported) </span> | +| `write.precombine.field` | N | ts | <span style="color:grey"> Field used in preCombining before actual write. When two records have the same key value, we will pick the one with the largest value for the precombine field, determined by Object.compareTo(..) </span> | +| `write.payload.class` | N | OverwriteWithLatestAvroPayload.class | <span style="color:grey"> Payload class used. Override this, if you like to roll your own merge logic, when upserting/inserting. This will render any value set for the option in-effective </span> | +| `write.insert.drop.duplicates` | N | false | <span style="color:grey"> Flag to indicate whether to drop duplicates upon insert. By default insert will accept duplicates, to gain extra performance </span> | +| `write.ignore.failed` | N | true | <span style="color:grey"> Flag to indicate whether to ignore any non exception error (e.g. writestatus error). within a checkpoint batch. By default true (in favor of streaming progressing over data integrity) </span> | +| `hoodie.datasource.write.recordkey.field` | N | uuid | <span style="color:grey"> Record key field. Value to be used as the `recordKey` component of `HoodieKey`. Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: `a.b.c` </span> | +| `hoodie.datasource.write.keygenerator.class` | N | SimpleAvroKeyGenerator.class | <span style="color:grey"> Key generator class, that implements will extract the key out of incoming record </span> | +| `write.tasks` | N | 4 | <span style="color:grey"> Parallelism of tasks that do actual write, default is 4 </span> | +| `write.batch.size.MB` | N | 128 | <span style="color:grey"> Batch buffer size in MB to flush data into the underneath filesystem </span> | + +If the table type is MERGE_ON_READ, you can also specify the asynchronous compaction strategy through options: + +| Option Name | Required | Default | Remarks | +| ----------- | ------- | ------- | ------- | +| `compaction.async.enabled` | N | true | <span style="color:grey"> Async Compaction, enabled by default for MOR </span> | +| `compaction.trigger.strategy` | N | num_commits | <span style="color:grey"> Strategy to trigger compaction, options are 'num_commits': trigger compaction when reach N delta commits; 'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction; 'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits' </span> | +| `compaction.delta_commits` | N | 5 | <span style="color:grey"> Max delta commits needed to trigger compaction, default 5 commits </span> | +| `compaction.delta_seconds` | N | 3600 | <span style="color:grey"> Max delta seconds time needed to trigger compaction, default 1 hour </span> | + +### Read Options + +| Option Name | Required | Default | Remarks | +| ----------- | ------- | ------- | ------- | +| `path` | Y | N/A | <span style="color:grey"> Base path for the target hoodie table. The path would be created if it does not exist, otherwise a hudi table expects to be initialized successfully </span> | +| `table.type` | N | COPY_ON_WRITE | <span style="color:grey"> Type of table to write. COPY_ON_WRITE (or) MERGE_ON_READ </span> | +| `read.tasks` | N | 4 | <span style="color:grey"> Parallelism of tasks that do actual read, default is 4 </span> | +| `read.avro-schema.path` | N | N/A | <span style="color:grey"> Avro schema file path, the parsed schema is used for deserialization, if not specified, the avro schema is inferred from the table DDL </span> | +| `read.avro-schema` | N | N/A | <span style="color:grey"> Avro schema string, the parsed schema is used for deserialization, if not specified, the avro schema is inferred from the table DDL </span> | +| `hoodie.datasource.query.type` | N | snapshot | <span style="color:grey"> Decides how data files need to be read, in 1) Snapshot mode (obtain latest view, based on row & columnar data); 2) incremental mode (new data since an instantTime), not supported yet; 3) Read Optimized mode (obtain latest view, based on columnar data). Default: snapshot </span> | +| `hoodie.datasource.merge.type` | N | payload_combine | <span style="color:grey"> For Snapshot query on merge on read table. Use this key to define how the payloads are merged, in 1) skip_merge: read the base file records plus the log file records; 2) payload_combine: read the base file records first, for each record in base file, checks whether the key is in the log file records(combines the two records with same key for base and log file records), then read the left log file records </span> | +| `hoodie.datasource.hive_style_partition` | N | false | <span style="color:grey"> Whether the partition path is with Hive style, e.g. '{partition key}={partition value}', default false </span> | +| `read.utc-timezone` | N | true | <span style="color:grey"> Use UTC timezone or local timezone to the conversion between epoch time and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x use UTC timezone, by default true </span> | + +If the table type is MERGE_ON_READ, streaming read is supported through options: + +| Option Name | Required | Default | Remarks | +| ----------- | ------- | ------- | ------- | +| `read.streaming.enabled` | N | false | <span style="color:grey"> Whether to read as streaming source, default false </span> | +| `read.streaming.check-interval` | N | 60 | <span style="color:grey"> Check interval for streaming read of SECOND, default 1 minute </span> | +| `read.streaming.start-commit` | N | N/A | <span style="color:grey"> Start commit instant for streaming read, the commit time format should be 'yyyyMMddHHmmss', by default reading from the latest instant </span> | + +## WriteClient Configs {#writeclient-configs} + +Jobs programming directly against the RDD level apis can build a `HoodieWriteConfig` object and pass it in to the `HoodieWriteClient` constructor. +HoodieWriteConfig can be built using a builder pattern as below. + +```java +HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder() + .withPath(basePath) + .forTable(tableName) + .withSchema(schemaStr) + .withProps(props) // pass raw k,v pairs from a property file. + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withXXX(...).build()) + .withIndexConfig(HoodieIndexConfig.newBuilder().withXXX(...).build()) + ... + .build(); +``` + +Following subsections go over different aspects of write configs, explaining most important configs with their property names, default values. + +#### withPath(hoodie_base_path) {#withPath} +Property: `hoodie.base.path` [Required] <br/> +<span style="color:grey">Base DFS path under which all the data partitions are created. Always prefix it explicitly with the storage scheme (e.g hdfs://, s3:// etc). Hudi stores all the main meta-data about commits, savepoints, cleaning audit logs etc in .hoodie directory under the base directory. </span> + +#### withSchema(schema_str) {#withSchema} +Property: `hoodie.avro.schema` [Required]<br/> +<span style="color:grey">This is the current reader avro schema for the table. This is a string of the entire schema. HoodieWriteClient uses this schema to pass on to implementations of HoodieRecordPayload to convert from the source format to avro record. This is also used when re-writing records during an update. </span> + +#### forTable(table_name) {#forTable} +Property: `hoodie.table.name` [Required] <br/> + <span style="color:grey">Table name that will be used for registering with Hive. Needs to be same across runs.</span> + +#### withBulkInsertParallelism(bulk_insert_parallelism = 1500) {#withBulkInsertParallelism} +Property: `hoodie.bulkinsert.shuffle.parallelism`<br/> +<span style="color:grey">Bulk insert is meant to be used for large initial imports and this parallelism determines the initial number of files in your table. Tune this to achieve a desired optimal size during initial import.</span> + +#### withUserDefinedBulkInsertPartitionerClass(className = x.y.z.UserDefinedPatitionerClass) {#withUserDefinedBulkInsertPartitionerClass} +Property: `hoodie.bulkinsert.user.defined.partitioner.class`<br/> +<span style="color:grey">If specified, this class will be used to re-partition input records before they are inserted.</span> + +#### withBulkInsertSortMode(mode = BulkInsertSortMode.GLOBAL_SORT) {#withBulkInsertSortMode} +Property: `hoodie.bulkinsert.sort.mode`<br/> Review comment: I dont see "hoodie.datasource.write.row.writer.enable". I assume you are going to add it a follow up patch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org