Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22121#discussion_r211987726
  
    --- Diff: docs/avro-data-source-guide.md ---
    @@ -0,0 +1,377 @@
    +---
    +layout: global
    +title: Apache Avro Data Source Guide
    +---
    +
    +* This will become a table of contents (this text will be scraped).
    +{:toc}
    +
    +Since Spark 2.4 release, [Spark 
SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides 
built-in support for reading and writing Apache Avro data.
    +
    +## Deploying
    +The `spark-avro` module is external and not included in `spark-submit` or 
`spark-shell` by default.
    +
    +As with any Spark applications, `spark-submit` is used to launch your 
application. `spark-avro_{{site.SCALA_BINARY_VERSION}}`
    +and its dependencies can be directly added to `spark-submit` using 
`--packages`, such as,
    +
    +    ./bin/spark-submit --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
    +
    +For experimenting on `spark-shell`, you can also use `--packages` to add 
`org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its 
dependencies directly,
    +
    +    ./bin/spark-shell --packages 
org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}}
 ...
    +
    +See [Application Submission Guide](submitting-applications.html) for more 
details about submitting applications with external dependencies.
    +
    +## Load and Save Functions
    +
    +Since `spark-avro` module is external, there is no `.avro` API in 
    +`DataFrameReader` or `DataFrameWriter`.
    +
    +To load/save data in Avro format, you need to specify the data source 
option `format` as `avro`(or `org.apache.spark.sql.avro`).
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +
    +val usersDF = 
spark.read.format("avro").load("examples/src/main/resources/users.avro")
    +usersDF.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +
    +Dataset<Row> usersDF = 
spark.read().format("avro").load("examples/src/main/resources/users.avro");
    +usersDF.select("name", 
"favorite_color").write().format("avro").save("namesAndFavColors.avro");
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="python" markdown="1">
    +{% highlight python %}
    +
    +df = 
spark.read.format("avro").load("examples/src/main/resources/users.avro")
    +df.select("name", 
"favorite_color").write.format("avro").save("namesAndFavColors.avro")
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="r" markdown="1">
    +{% highlight r %}
    +
    +df <- read.df("examples/src/main/resources/users.avro", "avro")
    +write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", 
"avro")
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## to_avro() and from_avro()
    +Spark SQL provides function `to_avro` to encode a struct as a string and 
`from_avro()` to retrieve the struct as a complex type.
    +
    +Using Avro record as columns are useful when reading from or writing to a 
streaming source like Kafka. Each 
    +Kafka key-value record will be augmented with some metadata, such as the 
ingestion timestamp into Kafka, the offset in Kafka, etc.
    +* If the "value" field that contains your data is in Avro, you could use 
`from_avro()` to extract your data, enrich it, clean it, and then push it 
downstream to Kafka again or write it out to a file.
    +* `to_avro()` can be used to turn structs into Avro records. This method 
is particularly useful when you would like to re-encode multiple columns into a 
single one when writing data out to Kafka.
    +
    +Both methods are presently only available in Scala and Java.
    +
    +<div class="codetabs">
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +import org.apache.spark.sql.avro._
    +
    +// `from_avro` requires Avro schema in JSON string format.
    +val jsonFormatSchema = new 
String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
    +
    +val df = spark
    +  .readStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("subscribe", "topic1")
    +  .load()
    +
    +// 1. Decode the Avro data into a struct;
    +// 2. Filter by column `favorite_color`;
    +// 3. Encode the column `name` in Avro format.
    +val output = df
    +  .select(from_avro('value, jsonFormatSchema) as 'user)
    +  .where("user.favorite_color == \"red\"")
    +  .select(to_avro($"user.name") as 'value)
    +
    +val ds = output
    +  .writeStream
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +import org.apache.spark.sql.avro.*
    +
    +// `from_avro` requires Avro schema in JSON string format.
    +String jsonFormatSchema = new 
String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
    +
    +Dataset<Row> df = spark
    +  .readStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("subscribe", "topic1")
    +  .load()
    +
    +// 1. Decode the Avro data into a struct;
    +// 2. Filter by column `favorite_color`;
    +// 3. Encode the column `name` in Avro format.
    +DataFrame output = df
    +  .select(from_avro(col("value"), jsonFormatSchema).as("user"))
    +  .where("user.favorite_color == \"red\"")
    +  .select(to_avro(col("user.name")).as("value"))
    +
    +StreamingQuery ds = output
    +  .writeStream()
    +  .format("kafka")
    +  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    +  .option("topic", "topic2")
    +  .start()
    +
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +## Data Source Option
    +
    +Data source options of Avro can be set using the `.option` method on 
`DataFrameReader` or `DataFrameWriter`.
    +<table class="table">
    +  <tr><th><b>Property 
Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Scope</b></th></tr>
    +  <tr>
    +    <td><code>avroSchema</code></td>
    +    <td>None</td>
    +    <td>Optional Avro schema provided by an user in JSON format.</td>
    +    <td>read and write</td>
    +  </tr>
    +  <tr>
    +    <td><code>recordName</code></td>
    +    <td>topLevelRecord</td>
    +    <td>Top level record name in write result, which is required in Avro 
spec.</td>
    +    <td>write</td>
    +  </tr>
    +  <tr>
    +    <td><code>recordNamespace</code></td>
    +    <td>""</td>
    +    <td>Record namespace in write result.</td>
    +    <td>write</td>
    +  </tr>
    +  <tr>
    +    <td><code>ignoreExtension</code></td>
    +    <td>true</td>
    +    <td>The option controls ignoring of files without <code>.avro</code> 
extensions in read.<br> If the option is enabled, all files (with and without 
<code>.avro</code> extension) are loaded.</td>
    +    <td>read</td>
    +  </tr>
    +  <tr>
    +    <td><code>compression</code></td>
    +    <td>snappy</td>
    +    <td>The <code>compression</code> option allows to specify a 
compression codec used in write.<br>
    +  Currently supported codecs are <code>uncompressed</code>, 
<code>snappy</code>, <code>deflate</code>, <code>bzip2</code> and 
<code>xz</code>.<br> If the option is not set, the configuration 
<code>spark.sql.avro.compression.codec</code> config is taken into account.</td>
    +    <td>write</td>
    +  </tr>
    +</table>
    +
    +## Configuration
    +Configuration of Avro can be done using the `setConf` method on 
SparkSession or by running `SET key=value` commands using SQL.
    +<table class="table">
    +  <tr><th><b>Property 
Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr>
    +  <tr>
    +    <td>spark.sql.legacy.replaceDatabricksSparkAvro.enabled</td>
    +    <td>true</td>
    +    <td>If it is set to true, the data source provider 
<code>com.databricks.spark.avro</code> is mapped to the built-in but external 
Avro data source module for backward compatibility.</td>
    +  </tr>
    +  <tr>
    +    <td>spark.sql.avro.compression.codec</td>
    +    <td>snappy</td>
    +    <td>Compression codec used in writing of AVRO files. Supported codecs: 
uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.</td>
    +  </tr>
    +  <tr>
    +    <td>spark.sql.avro.deflate.level</td>
    +    <td>-1</td>
    +    <td>Compression level for the deflate codec used in writing of AVRO 
files. Valid value must be in the range of from 1 to 9 inclusive or -1. The 
default value is -1 which corresponds to 6 level in the current 
implementation.</td>
    +  </tr>
    +</table>
    +
    +## Compatibility with Databricks spark-avro
    +This Avro data source module is originally from and compatible with 
Databricks's open source repository 
    +[spark-avro](https://github.com/databricks/spark-avro).
    +
    +By default with the SQL configuration 
`spark.sql.legacy.replaceDatabricksSparkAvro.enabled` enabled, the data source 
provider `com.databricks.spark.avro` is 
    +mapped to this built-in Avro module. For the Spark tables created with 
`Provider` property as `com.databricks.spark.avro` in 
    +catalog meta store, the mapping is essential to load these tables if you 
are using this built-in Avro module. 
    +
    +Note in Databricks's 
[spark-avro](https://github.com/databricks/spark-avro), implicit classes 
    +`AvroDataFrameWriter` and `AvroDataFrameReader` were created for shortcut 
function `.avro()`. In this 
    +built-in but external module, both implicit classes are removed. Please 
use `.format("avro")` in 
    +`DataFrameWriter` or `DataFrameReader` instead, which should be clean and 
good enough.
    +
    +If you prefer using your own build of `spark-avro` jar file, you can 
simply disable the configuration 
    +`spark.sql.legacy.replaceDatabricksSparkAvro.enabled`, and use the option 
`--jars` on deploying your 
    +applications. Read the [Advanced Dependency 
Management](https://spark.apache
    
+.org/docs/latest/submitting-applications.html#advanced-dependency-management) 
section in Application 
    +Submission Guide for more details. 
    +
    +## Supported types for Avro -> Spark SQL conversion
    +Currently Spark supports reading all [primitive 
types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and 
[complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) of 
Avro.
    +<table class="table">
    +  <tr><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr>
    +  <tr>
    +    <td>boolean</td>
    +    <td>BooleanType</td>
    +  </tr>
    +  <tr>
    +    <td>int</td>
    +    <td>IntegerType</td>
    --- End diff --
    
    Byte and Short both map to avro int, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to